From d4c421bcaaa274b669c6d4c8a4fa702901c6c735 Mon Sep 17 00:00:00 2001 From: Greg Cochard Date: Fri, 10 Jul 2020 09:05:49 -0700 Subject: [PATCH 1/6] Add Fido2 support Drop u2f support Drop python 2.7 support Freshen copyrights Update version Update author --- LICENSE | 4 +- README.md | 23 +-- alohomora/__init__.py | 15 +- alohomora/keys.py | 4 +- alohomora/main.py | 4 +- alohomora/req.py | 398 ++++++++++++++++++++++++++++-------------- alohomora/saml.py | 2 +- setup.py | 4 +- 8 files changed, 280 insertions(+), 174 deletions(-) diff --git a/LICENSE b/LICENSE index b2884d3..a89942c 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright 2020, Viasat, Inc. +Copyright 2022, Viasat, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this work except in compliance with the License. @@ -10,4 +10,4 @@ Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and -limitations under the License. \ No newline at end of file +limitations under the License. diff --git a/README.md b/README.md index 4811140..1b2c7e7 100644 --- a/README.md +++ b/README.md @@ -14,26 +14,15 @@ but the choice is yours. pip install alohomora -### Optional U2F support installation +### Optional WebAuthN support installation -Alohomora has optional U2F support, which can be installed alongside alohomora using pip: +Alohomora has optional Fido2/WebAuthN support which can be installed alongside alohomora using pip: - pip install alohomora[u2f] + pip install alohomora[fido2] -Please note that this requires a few OS level packages to be installed. -For Centos 7, it requires the following: - - yum groupinstall 'Development Tools' - yum install python[2,3]-devel - yum install libusbx-devel - yum install systemd-devel - -For Debian based systems, you'll need the following: - - apt install build-essential - apt install python[2,3]-dev - apt install libusb-1.0-0-dev - apt install libudev-dev +Please note that this may require a few OS level packages to be installed due to the transitive +dependency on the cryptography package. See the [installation guide](https://cryptography.io/en/latest/installation/) +for instructions specific to your environment. ## Basic Configuration diff --git a/alohomora/__init__.py b/alohomora/__init__.py index 5d297cc..9a2bf15 100644 --- a/alohomora/__init__.py +++ b/alohomora/__init__.py @@ -1,6 +1,6 @@ """Alohomora helper module""" -# Copyright 2020 Viasat, Inc. +# Copyright 2022 Viasat, Inc. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,19 +14,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import print_function - import sys -try: - input = raw_input -except NameError: - pass - -__version__ = '2.4.0' -__author__ = 'Stephan Kemper' +__version__ = '3.0.0' +__author__ = 'Viasat' __author_email__ = 'vice-support@viasat.com' -__license__ = '(c) 2020 Viasat, Inc. See the LICENSE file for more details.' +__license__ = '(c) 2022 Viasat, Inc. See the LICENSE file for more details.' __url__ = 'https://github.com/Viasat/alohomora' __description__ = 'Get AWS API keys for a SAML-federated identity' diff --git a/alohomora/keys.py b/alohomora/keys.py index 7e0e2e2..ec7df4c 100644 --- a/alohomora/keys.py +++ b/alohomora/keys.py @@ -1,6 +1,6 @@ """Handles getting and saving AWS API keys""" -# Copyright 2020 Viasat, Inc. +# Copyright 2022 Viasat, Inc. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,8 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import print_function - import os try: diff --git a/alohomora/main.py b/alohomora/main.py index 3a4b866..888dd7c 100755 --- a/alohomora/main.py +++ b/alohomora/main.py @@ -2,7 +2,7 @@ alohomora console script ''' -# Copyright 2020 Viasat, Inc. +# Copyright 2022 Viasat, Inc. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,8 +16,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import print_function - import argparse import getpass import logging diff --git a/alohomora/req.py b/alohomora/req.py index 5ea7b6a..235440f 100644 --- a/alohomora/req.py +++ b/alohomora/req.py @@ -1,6 +1,6 @@ """The workhorse functions that make web requests.""" -# Copyright 2020 Viasat, Inc. +# Copyright 2022 Viasat, Inc. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,13 +15,13 @@ # limitations under the License. # pylint: disable=too-few-public-methods,too-many-branches,too-many-locals,too-many-statements -from __future__ import print_function import re import json import logging import time import os +import base64 try: import urlparse @@ -38,43 +38,42 @@ import alohomora -U2F_SUPPORT = False +FIDO2_SUPPORT = False try: - from u2flib_host import u2f, exc - from u2flib_host.constants import APDU_USE_NOT_SATISFIED, APDU_WRONG_DATA - U2F_SUPPORT = True -except ImportError: - pass - -try: - input = raw_input #pylint: disable=redefined-builtin,invalid-name -except NameError: + from fido2.rpid import verify_rp_id + from fido2.hid import CtapHidDevice + from fido2.ctap import STATUS + from fido2.client import Fido2Client, ClientError + from fido2.utils import websafe_encode, websafe_decode + FIDO2_SUPPORT = True +except ImportError as err: pass LOG = logging.getLogger('alohomora.req') -def get_u2f_devices(): - """Get all U2F devices attached to the machine""" - devices = u2f.list_devices() - for device in devices: - try: - device.open() - except: # pylint: disable=bare-except - devices.remove(device) - return devices +def get_wa_devices(): + '''Return all eligible webauthn devices''' + return sorted(list(CtapHidDevice.list_devices()), key=lambda k: k.product_name, reverse=True) +ok_prompted = False +def on_keepalive(status): + '''Print the tap prompt''' + global ok_prompted + if status == STATUS.UPNEEDED and ok_prompted == False: # Waiting for touch + print('Please tap your security key...') + ok_prompted = True -if U2F_SUPPORT: +if FIDO2_SUPPORT: try: - get_u2f_devices() - U2F_SUPPORT = True + devices = get_wa_devices() + FIDO2_SUPPORT = True except: # pylint: disable=bare-except - U2F_SUPPORT = False + FIDO2_SUPPORT = False -class DuoDevice(object): +class DuoDevice(): """A Duo authentication device""" def __init__(self, requests_thing): self.value = requests_thing.get('value').strip() @@ -84,7 +83,7 @@ def __repr__(self): return "%s/%s" % (self.name, self.value) -class DuoFactor(object): +class DuoFactor(): """A Duo device factor""" def __init__(self, name): self.value = None @@ -94,7 +93,7 @@ def __repr__(self): return "%s/%s" % (self.name, self.value) -class WebProvider(object): +class WebProvider(): """A provider of authentication data from some web source""" def login_one_factor(self, username, password): """Authenticates the user to the IDP with a primary factor (username and password)""" @@ -114,86 +113,172 @@ def __init__(self, idp_url, auth_method=None): self.idp_url = idp_url self.auth_method = auth_method - def _validate_u2f_request(self, host, req): - LOG.debug('req["appId"]: %s, host: %s', req['appId'], host) - return req['appId'] == 'https://%s' % host + def _validate_webauthn_request(self, host, appid): + LOG.debug('req["appid"]: %s, host: %s', appid, host) + return appid == 'https://%s' % host - def _get_u2f_response(self, reqs): + def _get_webauthn_response(self, req): """ Authenticates against yubikey with all the sign requests """ - # if U2F enrolled, requests will look like - # [ - # { - # "appId": "https://api-12345678.duosecurity.com", - # "challenge": "shfdsjkaKJDGHFSKgfesgfieo2382", - # "keyHandle": "fjdskabghpferwuipgt4iuytr23g4uyiawhbiu", - # "sessionId": "jrt43uiq9tpgh43qu9gbhw3juipgtbw3", - # "version": "U2F_V2" - # }, - # { ... } - # ] - _ = input('Please ensure your security key is plugged in and hit enter...') - devices = get_u2f_devices() - if not devices: - raise IOError('no U2F devices found') - LOG.info('U2F requests: %s', reqs) - try: + # if webauthn enrolled, requests will look like + # { + # "extensions": {"appId": "https://api-12345678.duosecurity.com"}, + # "challenge": "shfdsjkaKJDGHFSKgfesgfieo2382", + # "allowCredentials": [{ + # "transports": ["usb", "nfc", "ble"], + # "type": "public-key", + # "id": "dipB0Q2TgTSpOINIsI9uaesA4ZrI1nGoeKc3Dx-VOvAJ1knOY46MzjY3da14KcTzLPzlIJF9p9gtqr2t6TfWeQ" + # }] + # "sessionId": "dn6WlN9Uunff3ZLSZuu9bdHTr1Nhj0p7Ov89ZcR77nI", + # "userVerification": "discouraged", + # "rpId": "duosecurity.com", + # "timeout": 60000 + # } + def _copy(item): + i = {} + for k, v in item.items(): + i[k] = v + return i + + wa_devices = get_wa_devices() + if not wa_devices: + raise IOError('no FIDO2 devices found') + LOG.info('WebauthN requests: %s', req) + cred_id = req['allowCredentials'][0]['id'] + session_id = req['sessionId'] + fido_req = dict() + for k, v in req.items(): + if k in ['challenge', 'timeout', 'rpId', 'allowCredentials', 'userVerification', 'extensions']: + if k == 'challenge': + fido_req[k] = websafe_decode(v) + elif k == 'allowCredentials': + fido_req[k] = [] + for c in v: + i = _copy(c) + i['id'] = websafe_decode(i['id']) + fido_req[k].append(i) + else: + fido_req[k] = v + # pass challenge to all devices sequentially + for device in wa_devices: prompted = False - valid_pairs = [] - removed = [] - # enumerate valid pairs of device: request - for device in devices: - LOG.debug('trying device %s', device) - remove = True - for request in reqs: - try: - return u2f.authenticate(device, json.dumps(request), request['appId']) - except exc.APDUError as e: #pylint: disable=invalid-name - if e.code == APDU_USE_NOT_SATISFIED: - valid_pairs.append({'device': device, 'request': request}) - LOG.debug('device %s just needs a little push', device) - remove = False - if not prompted: - print('Please tap your security key...') - prompted = True - elif e.code == APDU_WRONG_DATA: - LOG.debug('device/request mismatch') - else: - LOG.error('device %s has other problems: %s', device, e) - except exc.DeviceError: - LOG.error('DeviceError') - if remove: - LOG.debug('removing device %s', device) - removed.append(device) - for dev in removed: - dev.close() - time.sleep(0.5) - # now loop only over the valid pairs - while valid_pairs: - for pair in valid_pairs: - device, request = pair['device'], pair['request'] - try: - return u2f.authenticate(device, json.dumps(request), request['appId']) - except exc.APDUError as e: #pylint: disable=invalid-name - if e.code == APDU_USE_NOT_SATISFIED: - # can't imagine getting here, but I'll leave it in - if not prompted: - print('Please tap your security key...') - prompted = True - elif e.code == APDU_WRONG_DATA: - LOG.debug('device/request mismatch') - valid_pairs.remove(pair) - else: - LOG.error('device %s has other problems: %s', device, e) - time.sleep(0.25) - finally: - for device in devices: + ok_prompted = False + client = Fido2Client(device, req['extensions']['appid'], verify_rp_id) + LOG.debug('trying device %s with req %s', client, fido_req) + try: + if not prompted: + print('Please tap your security key...') + prompted = True + wa_resp = client.get_assertion(fido_req, on_keepalive=on_keepalive).get_response(0) + LOG.debug('wa_resp: %s', wa_resp) + LOG.debug('wa_resp.client_data: %s', wa_resp.client_data) + LOG.debug('wa_resp.authenticator_data: %s', wa_resp.authenticator_data) + + def b64enc(buf): + return websafe_encode(buf) + + def b64RawEnc(buf): + return base64.b64encode(buf).decode('utf-8').replace('+','-').replace('/','_') + + def hexEncode(buf): + return buf.hex() + + ## encode relevant fields, based on the following snippet from duo ## + """ + full message: + {"sid":"MGEzMDE3NGFjNmUwNDA1Yzk4MDZkNzdhOTRlODI0NWY=|104.129.198.109|1649389439|84db9f7e589d6ac2e82b135c60d06fa92dca9c75","device":"webauthn_credential","factor":"webauthn_finish","response_data":"{\\"sessionId\\":\\"AoL2xv58zp6_lMiKSobkVYNBoU9ZhlzzlBLb7uQ-VPc\\",\\"id\\":\\"dipB0Q2TgTSpOINIsI9uaesA4ZrI1nGoeKc3Dx-VOvAJ1knOY46MzjY3da14KcTzLPzlIJF9p9gtqr2t6TfWeQ\\",\\"rawId\\":\\"dipB0Q2TgTSpOINIsI9uaesA4ZrI1nGoeKc3Dx-VOvAJ1knOY46MzjY3da14KcTzLPzlIJF9p9gtqr2t6TfWeQ\\",\\"type\\":\\"public-key\\",\\"authenticatorData\\":\\"5Gq7wMECJqSk1hzZqsDdJP5jU8V79HTtuMIpu6_lawUBAAANQA==\\",\\"clientDataJSON\\":\\"eyJ0eXBlIjoid2ViYXV0aG4uZ2V0IiwiY2hhbGxlbmdlIjoiRnE1QWwxNFZnZng0UUpxRFVHQnBSNWdaQlJDUkhMdnciLCJvcmlnaW4iOiJodHRwczovL2FwaS02OTI2NzkxOC5kdW9zZWN1cml0eS5jb20iLCJjcm9zc09yaWdpbiI6ZmFsc2V9\\",\\"signature\\":\\"3046022100af6749afdca444ca389c91143b256d6fa2b3be71f2da8ec71a30661c9cd9524a022100eb5083196d0076bf7235db9a44c846cc56922a5aa3b9c135ad8b34b49aeeae31\\",\\"extensionResults\\":{\\"appid\\":false}}","out_of_date":"","days_out_of_date":"","days_to_block":"None"} + parsed response_data: + { + "sessionId":"AoL2xv58zp6_lMiKSobkVYNBoU9ZhlzzlBLb7uQ-VPc", + "id":"dipB0Q2TgTSpOINIsI9uaesA4ZrI1nGoeKc3Dx-VOvAJ1knOY46MzjY3da14KcTzLPzlIJF9p9gtqr2t6TfWeQ", + "rawId":"dipB0Q2TgTSpOINIsI9uaesA4ZrI1nGoeKc3Dx-VOvAJ1knOY46MzjY3da14KcTzLPzlIJF9p9gtqr2t6TfWeQ", + "type":"public-key", + "authenticatorData":"5Gq7wMECJqSk1hzZqsDdJP5jU8V79HTtuMIpu6_lawUBAAANQA==", + "clientDataJSON":"eyJ0eXBlIjoid2ViYXV0aG4uZ2V0IiwiY2hhbGxlbmdlIjoiRnE1QWwxNFZnZng0UUpxRFVHQnBSNWdaQlJDUkhMdnciLCJvcmlnaW4iOiJodHRwczovL2FwaS02OTI2NzkxOC5kdW9zZWN1cml0eS5jb20iLCJjcm9zc09yaWdpbiI6ZmFsc2V9", + "signature":"3046022100af6749afdca444ca389c91143b256d6fa2b3be71f2da8ec71a30661c9cd9524a022100eb5083196d0076bf7235db9a44c846cc56922a5aa3b9c135ad8b34b49aeeae31", + "extensionResults":{"appid":false}} + + https://api-69267918.duosecurity.com/frame/static/js/page/v3/prompt.js?v=73485:formatted + + exports.b64enc = function(buf) { + return base64js.fromByteArray(buf).replace(/\+/g, "-").replace(/\//g, "_").replace(/=/g, "") + } + , + exports.b64RawEnc = function(buf) { + return base64js.fromByteArray(buf).replace(/\+/g, "-").replace(/\//g, "_") + } + , + exports.hexEncode = function(buf) { + return Array.from(buf).map(function(x) { + return ("0" + x.toString(16)).substr(-2) + }).join("") + } + _transformAssertionData: function(sid, assertionData, options) { + void 0 === options && (options = {}); + var authenticatorData = new Uint8Array(assertionData.response.authenticatorData) + , clientDataJSON = new Uint8Array(assertionData.response.clientDataJSON) + , rawId = new Uint8Array(assertionData.rawId) + , signature = new Uint8Array(assertionData.response.signature) + , wData = { + sessionId: assertionData.sessionId, + id: assertionData.id, + rawId: (0, + _b.b64enc)(rawId), + type: assertionData.type, + authenticatorData: (0, + _b.b64RawEnc)(authenticatorData), + clientDataJSON: (0, + _b.b64RawEnc)(clientDataJSON), + signature: (0, + _b.hexEncode)(signature), + extensionResults: assertionData.extensionResults + }; + return _jquery.extend({ + sid: sid, + device: "webauthn_credential", + factor: "webauthn_finish", + response_data: JSON.stringify(wData) + }, options) + sessionId, id, rawId, type, authenticatorData, clientDataJSON, signature, extensionResults + """ + + # signature is hex # + resp = dict() + resp['signature'] = hexEncode(wa_resp.signature) + + # authenticatorData is a url-safe base64-encoded blob # + flags = wa_resp.authenticator_data.flags + counter = wa_resp.authenticator_data.counter + rpid_hash = wa_resp.authenticator_data.rp_id_hash + resp['authenticatorData'] = b64RawEnc(wa_resp.authenticator_data) + + # clientDataJSON is a base64-encoded JSON blob # + resp['clientDataJSON'] = b64RawEnc(wa_resp.client_data) + + # extensionResults needs some massaging # + resp['extensionResults'] = dict(appid=False) + raw_id = b64enc(wa_resp.get('credentialId')) + resp['rawId'] = raw_id + resp['id'] = cred_id + resp['sessionId'] = session_id + resp['type'] = 'public-key' + return resp + + # TODO: figure out the possible errors for FIDO2 + except ClientError as e: #pylint: disable=invalid-name + if e.code == ClientError.ERR.DEVICE_INELIGIBLE and len(list(devices)) > 1: + print('Please try another authenticator') + continue + else: + continue + LOG.error(e) + finally: device.close() - answer = input('No registered U2F device found, retry? [Y/n]') + answer = input('No registered WebauthN device found, retry? [Y/n]') if answer in ('Y', 'y', ''): - return self._get_u2f_response(reqs) - raise RuntimeWarning('No registered U2F device found') + return self._get_webauthn_response(req) + raise RuntimeWarning('No registered WebauthN device found') def login_one_factor(self, username, password): self.session = requests.Session() @@ -283,7 +368,14 @@ def login_two_factor(self, response_1fa, auth_device=None): frame_url = 'https://%s/frame/web/v1/auth?tx=%s&parent=%s&v=2.3' % \ (duo_host, duo_sig, response_1fa.url) LOG.info('Getting Duo iframe') - (response, soup) = self._do_get(frame_url) + # if the duo integration has an allowed origin list, we must + # pass the page URL as a Referer header in addition to using + # the `parent` query parameter in the frame URL + origin_duo_host = 'https://%s' % duo_host + (response, soup) = self._do_get(frame_url, headers={ + 'Referer': response_1fa.url, + 'Origin': origin_duo_host + }) payload = {} for inputtag in soup.find_all('input'): @@ -301,20 +393,39 @@ def login_two_factor(self, response_1fa, auth_device=None): device = self._get_duo_device(soup, auth_device) factor = self._get_auth_factor(soup, device) + do_wa = device.value.startswith('WA') # Finally send the POST request for an auth to Duo payload = { 'sid': sid, - 'device': device.value if ( - device.name != "Security Key (U2F)" - and not device.value.startswith('WA')) else "u2f_token", - 'factor': factor.name if ( - device.name != "Security Key (U2F)" - and not device.value.startswith('WA')) else "U2F Token", + 'device': device.value, + 'factor': factor.name, 'out_of_date': '' } + LOG.debug("Payload: %s", payload) if factor.name == "Passcode": payload['passcode'] = factor.value - headers = {'Referer': response.url} + prompt_sid_url = response.url + headers = {'Referer': prompt_sid_url, + 'Origin': origin_duo_host} + if do_wa: + # pull in the webauthn prompt + popup_url = 'https://%s%s/webauthn_auth_popup?sid=%s&wkey=%s' % (duo_host, new_action, sid, device.value) + LOG.debug("Popup URL: %s", popup_url) + (status, soup) = self._do_get( + popup_url, + headers=headers) + xsrf_token = soup.find('input').get('value', '') + payload = { + 'sid': sid, + 'device': device.value, + 'factor': factor.name if ( + device.name != "Security Key (U2F)" + and not device.value.startswith('WA')) else "WebAuthn Credential", + } + headers = {'Referer': popup_url, 'Origin': origin_duo_host} + + #headers = {'Referer': prompt_sid_url} + #headers = {'Referer': prompt_sid_url, 'Origin': duo_host} (status, _) = self._do_post( 'https://%s%s' % (duo_host, new_action), data=payload, @@ -323,13 +434,19 @@ def login_two_factor(self, response_1fa, auth_device=None): # Response is of form # {"stat": "OK", "response": {"txid": "f95cbacc-151c-43a6-b462-b33420e72633"}} - txid = json.loads(status.text)['response']['txid'] + LOG.debug("Received response: %s", status.text) + txid = '' + response = json.loads(status.text)['response'] + txid = response['txid'] + LOG.debug("Received response %s", response) LOG.debug("Received transaction ID %s", txid) + headers = {'Referer': prompt_sid_url, 'Origin': origin_duo_host} # Initial call will NOT block (status, _) = self._do_post( 'https://%s/frame/status' % duo_host, data={'sid': sid, 'txid': txid}, + headers=headers, soup=False) # text from this will be something like # { @@ -348,6 +465,22 @@ def login_two_factor(self, response_1fa, auth_device=None): # "u2f_sign_request": [{"keyHandle": "..."}, {...}] # } # } + # if webauthn enrolled, text will be like: + # { + # 'stat': 'OK', + # 'response': { + # 'status': 'Use your Security Key to log in.', + # 'status_code': 'webauthn_sent', + # 'status_body_msg': 'Use your security key to log in.', + # 'webauthn_credential_request_options': { + # 'allowCredentials': [{'transports': ['usb', 'nfc', 'ble'], 'type': 'public-key', 'id': 'dipB0Q2TgTSpOINIsI9uaesA4ZrI1nGoeKc3Dx-VOvAJ1knOY46MzjY3da14KcTzLPzlIJF9p9gtqr2t6TfWeQ'}], + # 'challenge': 'jUXmEDWAxx7b3jPxu57vGu7xvfWAulE8', 'rpId': 'duosecurity.com', + # 'timeout': 60000, 'sessionId': 'dn6WlN9Uunff3ZLSZuu9bdHTr1Nhj0p7Ov89ZcR77nI', + # 'userVerification': 'discouraged', 'extensions': {'appid': 'https://api-69267918.duosecurity.com'} + # } + # } + # } + status_data = json.loads(status.text) LOG.info(str(status_data)) if status_data['stat'] != 'OK': @@ -357,24 +490,28 @@ def login_two_factor(self, response_1fa, auth_device=None): allowed = status_data['response']['status_code'] == 'allow' # there should never be a case where `allowed` is True if the user picked Security Key - if device.name == "Security Key (U2F)" or device.value.startswith('WA'): - challenges = [r for r in status_data['response']['u2f_sign_request'] - if self._validate_u2f_request(duo_host, r)] - resp = self._get_u2f_response(challenges) - # pull the first challenge's sessionId since they all match - # the challenges list should not be empty here, as the device would not be presented - # to the user without a corresponding challenge - resp['sessionId'] = challenges[0]['sessionId'] + if status_data['response']['status_code'] == 'webauthn_sent': + opts = status_data['response']['webauthn_credential_request_options'] + challenges = [r for r in opts['allowCredentials'] + if self._validate_webauthn_request(duo_host, opts['extensions']['appid'])] + if not challenges: + alohomora.die('Sorry, there was a problem talking to Duo.') + resp = self._get_webauthn_response(opts) + LOG.debug(resp) + # include the session ID as passed to us earlier payload['sid'] = sid - # u2f_token and u2f_finish are magic strings here - payload['device'] = 'u2f_token' - payload['factor'] = 'u2f_finish' + # webauthn_credential and webauthn_finish are magic strings here + payload['device'] = 'webauthn_credential' + payload['factor'] = 'webauthn_finish' # these are a copy/paste from the duo integration's POST data payload['out_of_date'] = None + payload['days_out_of_date'] = None payload['days_to_block'] = 'None' # finally, the response data itself needs to be a JSON string - payload['response_data'] = json.dumps(resp) + payload['response_data'] = json.dumps(resp,separators=(',', ':')) + + LOG.debug(payload) (status, _) = self._do_post( 'https://%s%s' % (duo_host, new_action), @@ -385,12 +522,13 @@ def login_two_factor(self, response_1fa, auth_device=None): # Response is of form # {"stat": "OK", "response": {"txid": "f95cbacc-151c-43a6-b462-b33420e72633"}} txid = json.loads(status.text)['response']['txid'] - LOG.debug("Received transaction ID %s", txid) + LOG.debug("Received transaction ID %s from response %s", txid, status.text) # Initial call will NOT block (status, _) = self._do_post( 'https://%s/frame/status' % duo_host, data={'sid': sid, 'txid': txid}, + headers=headers, soup=False) status_data = json.loads(status.text) LOG.info(str(status_data)) @@ -484,17 +622,7 @@ def _get_duo_device(self, soup, auth_device): #pylint: disable=no-self-use supported_devices = ['phone', 'phone1', 'phone2', 'token', 'token1', 'token2'] # allow Security Keys by "name" not by "value", as value is a unique ID devices = [dev for dev in devices if dev.value in supported_devices or ( - U2F_SUPPORT and (dev.name == 'Security Key (U2F)' or dev.value.startswith('WA')))] - u2f_in_devices = False - # and now to offer a single "Security Key (U2F)" option, since we try all of them - deduped_devices = [] - for dev in devices: - if dev.name == 'Security Key (U2F)': - if u2f_in_devices: - continue - u2f_in_devices = True - deduped_devices.append(dev) - devices = deduped_devices + FIDO2_SUPPORT and dev.value.startswith('WA'))] LOG.debug("Acceptable devices: %s" % devices) @@ -527,7 +655,7 @@ def _get_duo_device(self, soup, auth_device): #pylint: disable=no-self-use def _get_auth_factor(self, soup, device): #pylint: disable=inconsistent-return-statements LOG.debug('Looking up auth factor options for %s', device.value) if device.name == 'Security Key (U2F)' or device.value.startswith('WA'): - return DuoFactor('u2f_factor') + return DuoFactor('webauthn_credential') for tag in soup.find_all('fieldset'): if tag.get('data-device-index') == device.value: factors = [] diff --git a/alohomora/saml.py b/alohomora/saml.py index 0d7c2d5..7ceae2d 100644 --- a/alohomora/saml.py +++ b/alohomora/saml.py @@ -1,6 +1,6 @@ """Does some work parsing SAML assertions""" -# Copyright 2020 Viasat, Inc. +# Copyright 2022 Viasat, Inc. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/setup.py b/setup.py index 369c360..f5d5cd3 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,4 @@ -# Copyright 2020 ViaSat, Inc. +# Copyright 2022 ViaSat, Inc. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -37,6 +37,6 @@ "requests>=2.11.1", ], extras_require={ - "u2f": ["python-u2flib-host>=3.0.3"] + "fido2": ["fido2>=0.9.3"] } ) From 13f2a03d71f705cbe1b74345f661e25b86d4cb02 Mon Sep 17 00:00:00 2001 From: Greg Cochard Date: Wed, 1 Jun 2022 09:58:21 -0700 Subject: [PATCH 2/6] Address review comments - Clean up pylint warnings - Move to f-strings throughout --- alohomora/req.py | 149 +++++++++++++++++++++-------------------------- 1 file changed, 68 insertions(+), 81 deletions(-) diff --git a/alohomora/req.py b/alohomora/req.py index 235440f..0773320 100644 --- a/alohomora/req.py +++ b/alohomora/req.py @@ -57,17 +57,16 @@ def get_wa_devices(): '''Return all eligible webauthn devices''' return sorted(list(CtapHidDevice.list_devices()), key=lambda k: k.product_name, reverse=True) -ok_prompted = False +prompted = set() def on_keepalive(status): '''Print the tap prompt''' - global ok_prompted - if status == STATUS.UPNEEDED and ok_prompted == False: # Waiting for touch + if status == STATUS.UPNEEDED and 'ok' not in prompted: # Waiting for touch print('Please tap your security key...') - ok_prompted = True + prompted.add('ok') if FIDO2_SUPPORT: try: - devices = get_wa_devices() + _fido_devices = get_wa_devices() FIDO2_SUPPORT = True except: # pylint: disable=bare-except FIDO2_SUPPORT = False @@ -115,7 +114,7 @@ def __init__(self, idp_url, auth_method=None): def _validate_webauthn_request(self, host, appid): LOG.debug('req["appid"]: %s, host: %s', appid, host) - return appid == 'https://%s' % host + return appid == f'https://{host}' def _get_webauthn_response(self, req): """ @@ -126,20 +125,16 @@ def _get_webauthn_response(self, req): # "extensions": {"appId": "https://api-12345678.duosecurity.com"}, # "challenge": "shfdsjkaKJDGHFSKgfesgfieo2382", # "allowCredentials": [{ - # "transports": ["usb", "nfc", "ble"], - # "type": "public-key", - # "id": "dipB0Q2TgTSpOINIsI9uaesA4ZrI1nGoeKc3Dx-VOvAJ1knOY46MzjY3da14KcTzLPzlIJF9p9gtqr2t6TfWeQ" + # "transports": ["usb", "nfc", "ble"], + # "type": "public-key", + # "id": + # "dipB0Q2TgTSpOINIsI9uaesA4ZrI1nGoeKc3Dx-VOvAJ1knOY46MzjY3da14KcTzLPzlIJF9p9gtqr2t6TfWeQ" # }] # "sessionId": "dn6WlN9Uunff3ZLSZuu9bdHTr1Nhj0p7Ov89ZcR77nI", # "userVerification": "discouraged", # "rpId": "duosecurity.com", # "timeout": 60000 # } - def _copy(item): - i = {} - for k, v in item.items(): - i[k] = v - return i wa_devices = get_wa_devices() if not wa_devices: @@ -148,28 +143,28 @@ def _copy(item): cred_id = req['allowCredentials'][0]['id'] session_id = req['sessionId'] fido_req = dict() - for k, v in req.items(): - if k in ['challenge', 'timeout', 'rpId', 'allowCredentials', 'userVerification', 'extensions']: + for k, val in req.items(): + if k in ['challenge', 'timeout', 'rpId', 'allowCredentials', + 'userVerification', 'extensions']: if k == 'challenge': - fido_req[k] = websafe_decode(v) + fido_req[k] = websafe_decode(val) elif k == 'allowCredentials': fido_req[k] = [] - for c in v: - i = _copy(c) + for cred in val: + i = cred.copy() i['id'] = websafe_decode(i['id']) fido_req[k].append(i) else: - fido_req[k] = v + fido_req[k] = val # pass challenge to all devices sequentially - for device in wa_devices: - prompted = False - ok_prompted = False + while len(wa_devices) > 0: + device = wa_devices.pop(0) client = Fido2Client(device, req['extensions']['appid'], verify_rp_id) LOG.debug('trying device %s with req %s', client, fido_req) try: - if not prompted: + if device not in prompted: print('Please tap your security key...') - prompted = True + prompted.add(device) wa_resp = client.get_assertion(fido_req, on_keepalive=on_keepalive).get_response(0) LOG.debug('wa_resp: %s', wa_resp) LOG.debug('wa_resp.client_data: %s', wa_resp.client_data) @@ -178,14 +173,14 @@ def _copy(item): def b64enc(buf): return websafe_encode(buf) - def b64RawEnc(buf): + def b64_raw_enc(buf): return base64.b64encode(buf).decode('utf-8').replace('+','-').replace('/','_') - def hexEncode(buf): + def hex_encode(buf): return buf.hex() ## encode relevant fields, based on the following snippet from duo ## - """ + r""" full message: {"sid":"MGEzMDE3NGFjNmUwNDA1Yzk4MDZkNzdhOTRlODI0NWY=|104.129.198.109|1649389439|84db9f7e589d6ac2e82b135c60d06fa92dca9c75","device":"webauthn_credential","factor":"webauthn_finish","response_data":"{\\"sessionId\\":\\"AoL2xv58zp6_lMiKSobkVYNBoU9ZhlzzlBLb7uQ-VPc\\",\\"id\\":\\"dipB0Q2TgTSpOINIsI9uaesA4ZrI1nGoeKc3Dx-VOvAJ1knOY46MzjY3da14KcTzLPzlIJF9p9gtqr2t6TfWeQ\\",\\"rawId\\":\\"dipB0Q2TgTSpOINIsI9uaesA4ZrI1nGoeKc3Dx-VOvAJ1knOY46MzjY3da14KcTzLPzlIJF9p9gtqr2t6TfWeQ\\",\\"type\\":\\"public-key\\",\\"authenticatorData\\":\\"5Gq7wMECJqSk1hzZqsDdJP5jU8V79HTtuMIpu6_lawUBAAANQA==\\",\\"clientDataJSON\\":\\"eyJ0eXBlIjoid2ViYXV0aG4uZ2V0IiwiY2hhbGxlbmdlIjoiRnE1QWwxNFZnZng0UUpxRFVHQnBSNWdaQlJDUkhMdnciLCJvcmlnaW4iOiJodHRwczovL2FwaS02OTI2NzkxOC5kdW9zZWN1cml0eS5jb20iLCJjcm9zc09yaWdpbiI6ZmFsc2V9\\",\\"signature\\":\\"3046022100af6749afdca444ca389c91143b256d6fa2b3be71f2da8ec71a30661c9cd9524a022100eb5083196d0076bf7235db9a44c846cc56922a5aa3b9c135ad8b34b49aeeae31\\",\\"extensionResults\\":{\\"appid\\":false}}","out_of_date":"","days_out_of_date":"","days_to_block":"None"} parsed response_data: @@ -245,16 +240,13 @@ def hexEncode(buf): # signature is hex # resp = dict() - resp['signature'] = hexEncode(wa_resp.signature) + resp['signature'] = hex_encode(wa_resp.signature) # authenticatorData is a url-safe base64-encoded blob # - flags = wa_resp.authenticator_data.flags - counter = wa_resp.authenticator_data.counter - rpid_hash = wa_resp.authenticator_data.rp_id_hash - resp['authenticatorData'] = b64RawEnc(wa_resp.authenticator_data) + resp['authenticatorData'] = b64_raw_enc(wa_resp.authenticator_data) # clientDataJSON is a base64-encoded JSON blob # - resp['clientDataJSON'] = b64RawEnc(wa_resp.client_data) + resp['clientDataJSON'] = b64_raw_enc(wa_resp.client_data) # extensionResults needs some massaging # resp['extensionResults'] = dict(appid=False) @@ -265,14 +257,25 @@ def hexEncode(buf): resp['type'] = 'public-key' return resp - # TODO: figure out the possible errors for FIDO2 - except ClientError as e: #pylint: disable=invalid-name - if e.code == ClientError.ERR.DEVICE_INELIGIBLE and len(list(devices)) > 1: + except ClientError as err: + if err.code == ClientError.ERR.DEVICE_INELIGIBLE and len(list(wa_devices)) > 1: print('Please try another authenticator') continue - else: + if err.code == ClientError.ERR.TIMEOUT: + # this is a retryable error + print('Timeout waiting for tap, please try again') + # put it back on the head of the list so it's used again + wa_devices.insert(0, device) + continue + # other errors are OTHER_ERROR, BAD_REQUEST, and CONFIGURATION_UNSUPPORTED + # so we can safely continue on with other devices without any error handling + LOG.error(err) + continue + except KeyboardInterrupt: + answer = input('Interrupted, would you like to continue? [Y/n]') + if answer in ('Y', 'y', ''): continue - LOG.error(e) + raise finally: device.close() answer = input('No registered WebauthN device found, retry? [Y/n]') @@ -365,13 +368,13 @@ def login_two_factor(self, response_1fa, auth_device=None): app_sig = sigs[1] # Pulling the iframe into the page - frame_url = 'https://%s/frame/web/v1/auth?tx=%s&parent=%s&v=2.3' % \ - (duo_host, duo_sig, response_1fa.url) + frame_url = f'https://{duo_host}/frame/web/v1/auth' + \ + f'?tx={duo_sig}&parent={response_1fa.url}&v=2.3' LOG.info('Getting Duo iframe') # if the duo integration has an allowed origin list, we must # pass the page URL as a Referer header in addition to using # the `parent` query parameter in the frame URL - origin_duo_host = 'https://%s' % duo_host + origin_duo_host = f'https://{duo_host}' (response, soup) = self._do_get(frame_url, headers={ 'Referer': response_1fa.url, 'Origin': origin_duo_host @@ -409,12 +412,12 @@ def login_two_factor(self, response_1fa, auth_device=None): 'Origin': origin_duo_host} if do_wa: # pull in the webauthn prompt - popup_url = 'https://%s%s/webauthn_auth_popup?sid=%s&wkey=%s' % (duo_host, new_action, sid, device.value) + popup_url = f'https://{duo_host}{new_action}/' + \ + f'webauthn_auth_popup?sid={sid}&wkey={device.value}' LOG.debug("Popup URL: %s", popup_url) (status, soup) = self._do_get( popup_url, headers=headers) - xsrf_token = soup.find('input').get('value', '') payload = { 'sid': sid, 'device': device.value, @@ -424,18 +427,12 @@ def login_two_factor(self, response_1fa, auth_device=None): } headers = {'Referer': popup_url, 'Origin': origin_duo_host} - #headers = {'Referer': prompt_sid_url} - #headers = {'Referer': prompt_sid_url, 'Origin': duo_host} - (status, _) = self._do_post( - 'https://%s%s' % (duo_host, new_action), - data=payload, - headers=headers, - soup=False) + (status, _) = self._do_post(f'https://{duo_host}{new_action}', data=payload, + headers=headers, soup=False) # Response is of form # {"stat": "OK", "response": {"txid": "f95cbacc-151c-43a6-b462-b33420e72633"}} LOG.debug("Received response: %s", status.text) - txid = '' response = json.loads(status.text)['response'] txid = response['txid'] LOG.debug("Received response %s", response) @@ -443,11 +440,8 @@ def login_two_factor(self, response_1fa, auth_device=None): headers = {'Referer': prompt_sid_url, 'Origin': origin_duo_host} # Initial call will NOT block - (status, _) = self._do_post( - 'https://%s/frame/status' % duo_host, - data={'sid': sid, 'txid': txid}, - headers=headers, - soup=False) + (status, _) = self._do_post(f'https://{duo_host}/frame/status', + data={'sid': sid, 'txid': txid}, headers=headers, soup=False) # text from this will be something like # { # "stat": "OK", @@ -473,10 +467,15 @@ def login_two_factor(self, response_1fa, auth_device=None): # 'status_code': 'webauthn_sent', # 'status_body_msg': 'Use your security key to log in.', # 'webauthn_credential_request_options': { - # 'allowCredentials': [{'transports': ['usb', 'nfc', 'ble'], 'type': 'public-key', 'id': 'dipB0Q2TgTSpOINIsI9uaesA4ZrI1nGoeKc3Dx-VOvAJ1knOY46MzjY3da14KcTzLPzlIJF9p9gtqr2t6TfWeQ'}], + # 'allowCredentials': [{'transports': ['usb', 'nfc', 'ble'], + # 'type': 'public-key', 'id': + # 'dipB0Q2TgTSpOINIsI9uaesA4ZrI1nGoeKc3Dx-VOvAJ1knOY46MzjY3da14KcTzLPzlIJF9p9gtqr2t6TfWeQ' + # }], # 'challenge': 'jUXmEDWAxx7b3jPxu57vGu7xvfWAulE8', 'rpId': 'duosecurity.com', # 'timeout': 60000, 'sessionId': 'dn6WlN9Uunff3ZLSZuu9bdHTr1Nhj0p7Ov89ZcR77nI', - # 'userVerification': 'discouraged', 'extensions': {'appid': 'https://api-69267918.duosecurity.com'} + # 'userVerification': 'discouraged', 'extensions': { + # 'appid': 'https://api-69267918.duosecurity.com' + # } # } # } # } @@ -513,11 +512,8 @@ def login_two_factor(self, response_1fa, auth_device=None): LOG.debug(payload) - (status, _) = self._do_post( - 'https://%s%s' % (duo_host, new_action), - data=payload, - headers=headers, - soup=False) + (status, _) = self._do_post(f'https://{duo_host}{new_action}', data=payload, + headers=headers, soup=False) status_data = json.loads(status.text) # Response is of form # {"stat": "OK", "response": {"txid": "f95cbacc-151c-43a6-b462-b33420e72633"}} @@ -525,11 +521,8 @@ def login_two_factor(self, response_1fa, auth_device=None): LOG.debug("Received transaction ID %s from response %s", txid, status.text) # Initial call will NOT block - (status, _) = self._do_post( - 'https://%s/frame/status' % duo_host, - data={'sid': sid, 'txid': txid}, - headers=headers, - soup=False) + (status, _) = self._do_post(f'https://{duo_host}/frame/status', + data={'sid': sid, 'txid': txid}, headers=headers, soup=False) status_data = json.loads(status.text) LOG.info(str(status_data)) if status_data['stat'] != 'OK': @@ -544,10 +537,8 @@ def login_two_factor(self, response_1fa, auth_device=None): # call again to get status of request # for a push notification, this will hang until the user approves/denies # for a phone call, you need to keep polling until the user approves/denies - (status, _) = self._do_post( - 'https://%s/frame/status' % duo_host, - data={'sid': sid, 'txid': txid}, - soup=False) + (status, _) = self._do_post('https://{duo_host}/frame/status', + data={'sid': sid, 'txid': txid}, soup=False) status_data = json.loads(status.text) if status_data['stat'] != 'OK': @@ -569,10 +560,8 @@ def login_two_factor(self, response_1fa, auth_device=None): # We have to specifically ask Duo for the signed auth string; # this doesn't come for free anymore (postresult, _) = self._do_post( - 'https://%s%s' % (duo_host, status_data['response']['result_url']), - data={'sid': sid}, - soup=False - ) + f'https://{duo_host}{status_data["response"]["result_url"]}', + data={'sid': sid}, soup=False) postresult_data = json.loads(postresult.text) signed_auth = postresult_data['response']['cookie'] elif 'cookie' in status_data['response']: @@ -583,11 +572,9 @@ def login_two_factor(self, response_1fa, auth_device=None): payload = { '_eventId_proceed': 'transition', - 'sig_response': '%s:%s' % (signed_auth, app_sig) + 'sig_response': f'{signed_auth}:{app_sig}' } - (response, soup) = self._do_post( - response_1fa.url, - data=payload) + (response, soup) = self._do_post(response_1fa.url, data=payload) assertion = self._get_assertion(soup) return (True, assertion) From 515c5f7b500f3bfbbea6cd646b96cd26c6886e7a Mon Sep 17 00:00:00 2001 From: Greg Cochard Date: Wed, 1 Jun 2022 12:44:54 -0700 Subject: [PATCH 3/6] Fix off-by-one error --- alohomora/req.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alohomora/req.py b/alohomora/req.py index 0773320..8331edf 100644 --- a/alohomora/req.py +++ b/alohomora/req.py @@ -258,7 +258,7 @@ def hex_encode(buf): return resp except ClientError as err: - if err.code == ClientError.ERR.DEVICE_INELIGIBLE and len(list(wa_devices)) > 1: + if err.code == ClientError.ERR.DEVICE_INELIGIBLE and len(list(wa_devices)) > 0: print('Please try another authenticator') continue if err.code == ClientError.ERR.TIMEOUT: From f07772f5ba2aa2a03eb72f5bdd9a619f1a4be6ba Mon Sep 17 00:00:00 2001 From: Greg Cochard Date: Thu, 2 Jun 2022 08:42:30 -0700 Subject: [PATCH 4/6] Give some whitespace to prompts --- alohomora/req.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/alohomora/req.py b/alohomora/req.py index 8331edf..294225b 100644 --- a/alohomora/req.py +++ b/alohomora/req.py @@ -272,13 +272,13 @@ def hex_encode(buf): LOG.error(err) continue except KeyboardInterrupt: - answer = input('Interrupted, would you like to continue? [Y/n]') + answer = input('Interrupted, would you like to continue? [Y/n] ') if answer in ('Y', 'y', ''): continue raise finally: device.close() - answer = input('No registered WebauthN device found, retry? [Y/n]') + answer = input('No registered WebauthN device found, retry? [Y/n] ') if answer in ('Y', 'y', ''): return self._get_webauthn_response(req) raise RuntimeWarning('No registered WebauthN device found') From 8305b88f81c7a24edf9c626f29dc11a3d3b10f00 Mon Sep 17 00:00:00 2001 From: Greg Cochard Date: Thu, 2 Jun 2022 09:05:37 -0700 Subject: [PATCH 5/6] Fix missed f-string --- alohomora/req.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alohomora/req.py b/alohomora/req.py index 294225b..be0c576 100644 --- a/alohomora/req.py +++ b/alohomora/req.py @@ -537,7 +537,7 @@ def login_two_factor(self, response_1fa, auth_device=None): # call again to get status of request # for a push notification, this will hang until the user approves/denies # for a phone call, you need to keep polling until the user approves/denies - (status, _) = self._do_post('https://{duo_host}/frame/status', + (status, _) = self._do_post(f'https://{duo_host}/frame/status', data={'sid': sid, 'txid': txid}, soup=False) status_data = json.loads(status.text) From c553039faf595a23bd9eddd8cbc6114917860b3e Mon Sep 17 00:00:00 2001 From: Greg Cochard Date: Thu, 2 Jun 2022 09:16:03 -0700 Subject: [PATCH 6/6] Parens instead of escaped newlines --- alohomora/req.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/alohomora/req.py b/alohomora/req.py index be0c576..1de01e5 100644 --- a/alohomora/req.py +++ b/alohomora/req.py @@ -368,8 +368,8 @@ def login_two_factor(self, response_1fa, auth_device=None): app_sig = sigs[1] # Pulling the iframe into the page - frame_url = f'https://{duo_host}/frame/web/v1/auth' + \ - f'?tx={duo_sig}&parent={response_1fa.url}&v=2.3' + frame_url = (f'https://{duo_host}/frame/web/v1/auth' + f'?tx={duo_sig}&parent={response_1fa.url}&v=2.3') LOG.info('Getting Duo iframe') # if the duo integration has an allowed origin list, we must # pass the page URL as a Referer header in addition to using @@ -412,8 +412,8 @@ def login_two_factor(self, response_1fa, auth_device=None): 'Origin': origin_duo_host} if do_wa: # pull in the webauthn prompt - popup_url = f'https://{duo_host}{new_action}/' + \ - f'webauthn_auth_popup?sid={sid}&wkey={device.value}' + popup_url = (f'https://{duo_host}{new_action}/' + f'webauthn_auth_popup?sid={sid}&wkey={device.value}') LOG.debug("Popup URL: %s", popup_url) (status, soup) = self._do_get( popup_url,