From d90c40bc9f1220d74769b100c3cbac076bce61be Mon Sep 17 00:00:00 2001 From: Mohamed ElKalioby Date: Mon, 27 May 2024 08:53:02 +0300 Subject: [PATCH] Merged v2.9.0 --- .gitignore | 1 + .pre-commit-config.yaml | 16 ++ .pyre_configuration | 15 ++ CHANGELOG.md | 7 + README.md | 3 + example/example/auth.py | 34 ++-- example/example/settings.py | 139 ++++++------- example/example/urls.py | 19 +- example/example/views.py | 5 +- mfa/Common.py | 21 +- mfa/Email.py | 105 ++++++---- mfa/FIDO2.py | 185 ++++++++++++------ mfa/TrustedDevice.py | 161 +++++++++------ mfa/U2F.py | 156 ++++++++++----- mfa/__init__.py | 2 +- mfa/apps.py | 6 +- mfa/helpers.py | 56 ++++-- mfa/middleware.py | 26 ++- mfa/migrations/0001_initial.py | 22 ++- mfa/migrations/0002_user_keys_key_type.py | 9 +- mfa/migrations/0003_auto_20181114_2159.py | 7 +- mfa/migrations/0004_user_keys_enabled.py | 7 +- mfa/migrations/0005_auto_20181115_2014.py | 21 +- mfa/migrations/0006_trusted_devices.py | 29 +-- mfa/migrations/0007_auto_20181230_1549.py | 9 +- mfa/migrations/0008_user_keys_last_used.py | 7 +- .../0009_user_keys_owned_by_enterprise.py | 15 +- mfa/migrations/0010_auto_20201110_0557.py | 9 +- mfa/migrations/0011_auto_20210530_0622.py | 7 +- mfa/models.py | 58 +++--- mfa/recovery.py | 120 +++++++----- mfa/totp.py | 133 ++++++++----- mfa/urls.py | 103 +++++----- mfa/views.py | 99 ++++++---- requirements.txt | 5 +- setup.py | 41 ++-- 36 files changed, 1021 insertions(+), 637 deletions(-) create mode 100644 .pre-commit-config.yaml create mode 100644 .pyre_configuration diff --git a/.gitignore b/.gitignore index f3d5cc3..46d2341 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .idea +.pyre/ example/venv # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..0e7cb07 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,16 @@ +repos: + # Using this mirror lets us use mypyc-compiled black, which is about 2x faster + - repo: https://github.com/psf/black-pre-commit-mirror + rev: 23.12.1 + hooks: + - id: black + # It is recommended to specify the latest version of Python + # supported by your project here, or alternatively use + # pre-commit's default_language_version, see + # https://pre-commit.com/#top_level-default_language_version + language_version: python3.11 + + - repo: https://github.com/fsouza/pre-commit-pyre-check + rev: '199dc59' # Use the sha / tag you want to point at + hooks: + - id: pyre-check diff --git a/.pyre_configuration b/.pyre_configuration new file mode 100644 index 0000000..591ec25 --- /dev/null +++ b/.pyre_configuration @@ -0,0 +1,15 @@ +{ + "source_directories": [ + "." + ], + "search_path": [ + "env/lib/python3.11/site-packages/" + ], + "ignore_all_errors":[ + "*env*/*", + "example/venv/*", +"build/*", +"example/*" + + ] +} diff --git a/CHANGELOG.md b/CHANGELOG.md index f301507..bd1cda5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,11 @@ # Change Log + +## 2.9.0 +* Add: Set black as code formatter +* Add: Add Pyre as a type checker +* Add: Add pre-commit hooks +* Upgrade: fido to be 1.1.0 as minimum + ## 2.8.0 * Support For Django 4.0+ JSONField * Removed jsonfield package from requirements diff --git a/README.md b/README.md index 2cae41f..8e1aadb 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,9 @@ A Django app that handles MFA, it supports TOTP, U2F, FIDO2 U2F (Web Authn), Email Tokens , Trusted Devices and backup codes. [![Works with PassKeys](https://github.com/mkalioby/django-mfa2/raw/master/img/Works%20with%20PassKeys-black.png)](https://fidoalliance.org/passkeys/) + +[![Code Style Black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) +[![Type Checker By Pyre](https://img.shields.io/badge/type%20checker-pyre-orange)](https://pyre-check.org/) ### Pip Stats [![PyPI version](https://badge.fury.io/py/django-mfa2.svg)](https://badge.fury.io/py/django-mfa2) [![Downloads Count](https://static.pepy.tech/personalized-badge/django-mfa2?period=total&units=international_system&left_color=black&right_color=green&left_text=Downloads)](https://pepy.tech/project/django-mfa2) diff --git a/example/example/auth.py b/example/example/auth.py index afd587b..6d6fda6 100644 --- a/example/example/auth.py +++ b/example/example/auth.py @@ -1,30 +1,36 @@ from django.shortcuts import render from django.http import HttpResponseRedirect from django.urls import reverse -from django.contrib.auth import authenticate,login,logout +from django.contrib.auth import authenticate, login, logout from django.contrib.auth.models import User + + def loginView(request): - context={} - if request.method=="POST": - username=request.POST["username"] - password=request.POST["password"] - user=authenticate(username=username,password=password) + context = {} + if request.method == "POST": + username = request.POST["username"] + password = request.POST["password"] + user = authenticate(username=username, password=password) if user: from mfa.helpers import has_mfa - res = has_mfa(username = username, request = request) # has_mfa returns false or HttpResponseRedirect + + res = has_mfa( + username=username, request=request + ) # has_mfa returns false or HttpResponseRedirect if res: return res - return create_session(request,user.username) - context["invalid"]=True + return create_session(request, user.username) + context["invalid"] = True return render(request, "login.html", context) -def create_session(request,username): - user=User.objects.get(username=username) - user.backend='django.contrib.auth.backends.ModelBackend' + +def create_session(request, username): + user = User.objects.get(username=username) + user.backend = "django.contrib.auth.backends.ModelBackend" login(request, user) - return HttpResponseRedirect(reverse('home')) + return HttpResponseRedirect(reverse("home")) def logoutView(request): logout(request) - return render(request,"logout.html",{}) \ No newline at end of file + return render(request, "logout.html", {}) diff --git a/example/example/settings.py b/example/example/settings.py index a341a6c..5c45991 100644 --- a/example/example/settings.py +++ b/example/example/settings.py @@ -21,65 +21,65 @@ # See https://docs.djangoproject.com/en/2.0/howto/deployment/checklist/ # SECURITY WARNING: keep the secret key used in production secret! -SECRET_KEY = '#9)q!_i3@pr-^3oda(e^3$x!kq3b4f33#5l@+=+&vuz+p6gb3g' +SECRET_KEY = "#9)q!_i3@pr-^3oda(e^3$x!kq3b4f33#5l@+=+&vuz+p6gb3g" # SECURITY WARNING: don't run with debug turned on in production! DEBUG = True -ALLOWED_HOSTS = ['*'] +ALLOWED_HOSTS = ["*"] # Application definition INSTALLED_APPS = [ - 'django.contrib.admin', - 'django.contrib.auth', - 'django.contrib.contenttypes', - 'django.contrib.sessions', - 'django.contrib.messages', - 'django.contrib.staticfiles', - 'mfa', - 'sslserver' + "django.contrib.admin", + "django.contrib.auth", + "django.contrib.contenttypes", + "django.contrib.sessions", + "django.contrib.messages", + "django.contrib.staticfiles", + "mfa", + "sslserver", ] MIDDLEWARE = [ - 'django.middleware.security.SecurityMiddleware', - 'django.contrib.sessions.middleware.SessionMiddleware', - 'django.middleware.common.CommonMiddleware', - 'django.middleware.csrf.CsrfViewMiddleware', - 'django.contrib.auth.middleware.AuthenticationMiddleware', - 'django.contrib.messages.middleware.MessageMiddleware', - 'django.middleware.clickjacking.XFrameOptionsMiddleware', + "django.middleware.security.SecurityMiddleware", + "django.contrib.sessions.middleware.SessionMiddleware", + "django.middleware.common.CommonMiddleware", + "django.middleware.csrf.CsrfViewMiddleware", + "django.contrib.auth.middleware.AuthenticationMiddleware", + "django.contrib.messages.middleware.MessageMiddleware", + "django.middleware.clickjacking.XFrameOptionsMiddleware", ] -ROOT_URLCONF = 'example.urls' +ROOT_URLCONF = "example.urls" TEMPLATES = [ { - 'BACKEND': 'django.template.backends.django.DjangoTemplates', - 'DIRS': [os.path.join(BASE_DIR ,'example','templates' )], - 'APP_DIRS': True, - 'OPTIONS': { - 'context_processors': [ - 'django.template.context_processors.debug', - 'django.template.context_processors.request', - 'django.contrib.auth.context_processors.auth', - 'django.contrib.messages.context_processors.messages', + "BACKEND": "django.template.backends.django.DjangoTemplates", + "DIRS": [os.path.join(BASE_DIR, "example", "templates")], + "APP_DIRS": True, + "OPTIONS": { + "context_processors": [ + "django.template.context_processors.debug", + "django.template.context_processors.request", + "django.contrib.auth.context_processors.auth", + "django.contrib.messages.context_processors.messages", ], }, }, ] -WSGI_APPLICATION = 'example.wsgi.application' +WSGI_APPLICATION = "example.wsgi.application" # Database # https://docs.djangoproject.com/en/2.0/ref/settings/#databases DATABASES = { - 'default': { - 'ENGINE': 'django.db.backends.sqlite3', - 'NAME': 'test_db', + "default": { + "ENGINE": "django.db.backends.sqlite3", + "NAME": "test_db", } } @@ -89,16 +89,16 @@ AUTH_PASSWORD_VALIDATORS = [ { - 'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator', + "NAME": "django.contrib.auth.password_validation.UserAttributeSimilarityValidator", }, { - 'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator', + "NAME": "django.contrib.auth.password_validation.MinimumLengthValidator", }, { - 'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator', + "NAME": "django.contrib.auth.password_validation.CommonPasswordValidator", }, { - 'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator', + "NAME": "django.contrib.auth.password_validation.NumericPasswordValidator", }, ] @@ -106,9 +106,9 @@ # Internationalization # https://docs.djangoproject.com/en/2.0/topics/i18n/ -LANGUAGE_CODE = 'en-us' +LANGUAGE_CODE = "en-us" -TIME_ZONE = 'UTC' +TIME_ZONE = "UTC" USE_I18N = True @@ -120,37 +120,38 @@ # Static files (CSS, JavaScript, Images) # https://docs.djangoproject.com/en/2.0/howto/static-files/ -STATIC_URL = '/static/' -#STATIC_ROOT=(os.path.join(BASE_DIR,'static')) -STATICFILES_DIRS=[os.path.join(BASE_DIR,'static')] -LOGIN_URL="/auth/login" - -EMAIL_FROM='Test App' -EMAIL_HOST="smtp.gmail.com" -EMAIL_PORT=587 -EMAIL_HOST_USER="" -EMAIL_HOST_PASSWORD='' -EMAIL_USE_TLS=True - - - -MFA_UNALLOWED_METHODS=() # Methods that shouldn't be allowed for the user -MFA_LOGIN_CALLBACK="example.auth.create_session" # A function that should be called by username to login the user in session -MFA_RECHECK=True # Allow random rechecking of the user -MFA_RECHECK_MIN=10 # Minimum interval in seconds -MFA_RECHECK_MAX=30 # Maximum in seconds -MFA_QUICKLOGIN=True # Allow quick login for returning users by provide only their 2FA -MFA_HIDE_DISABLE=('',) # Can the user disable his key (Added in 1.2.0). -MFA_REDIRECT_AFTER_REGISTRATION="registered" -MFA_SUCCESS_REGISTRATION_MSG="Go to Home" +STATIC_URL = "/static/" +# STATIC_ROOT=(os.path.join(BASE_DIR,'static')) +STATICFILES_DIRS = [os.path.join(BASE_DIR, "static")] +LOGIN_URL = "/auth/login" + +EMAIL_FROM = "Test App" +EMAIL_HOST = "smtp.gmail.com" +EMAIL_PORT = 587 +EMAIL_HOST_USER = "" +EMAIL_HOST_PASSWORD = "" +EMAIL_USE_TLS = True + + +MFA_UNALLOWED_METHODS = () # Methods that shouldn't be allowed for the user +MFA_LOGIN_CALLBACK = "example.auth.create_session" # A function that should be called by username to login the user in session +MFA_RECHECK = True # Allow random rechecking of the user +MFA_RECHECK_MIN = 10 # Minimum interval in seconds +MFA_RECHECK_MAX = 30 # Maximum in seconds +MFA_QUICKLOGIN = True # Allow quick login for returning users by provide only their 2FA +MFA_HIDE_DISABLE = ("",) # Can the user disable his key (Added in 1.2.0). +MFA_REDIRECT_AFTER_REGISTRATION = "registered" +MFA_SUCCESS_REGISTRATION_MSG = "Go to Home" MFA_ALWAYS_GO_TO_LAST_METHOD = True MFA_ENFORCE_RECOVERY_METHOD = True -MFA_RENAME_METHODS = {"RECOVERY":"Backup Codes","FIDO2":"Biometric Authentication"} -PASSWORD_HASHERS = DEFAULT_PASSWORD_HASHERS #Comment if PASSWORD_HASHER already set -PASSWORD_HASHERS += ['mfa.recovery.Hash'] -RECOVERY_ITERATION = 1 #Number of iteration for recovery code, higher is more secure, but uses more resources for generation and check... -TOKEN_ISSUER_NAME="PROJECT_NAME" #TOTP Issuer name - -U2F_APPID="https://localhost:9000" #URL For U2F -FIDO_SERVER_ID="localhost" # Server rp id for FIDO2, it the full domain of your project -FIDO_SERVER_NAME="TestApp" +MFA_RENAME_METHODS = {"RECOVERY": "Backup Codes", "FIDO2": "Biometric Authentication"} +PASSWORD_HASHERS = DEFAULT_PASSWORD_HASHERS # Comment if PASSWORD_HASHER already set +PASSWORD_HASHERS += ["mfa.recovery.Hash"] +RECOVERY_ITERATION = 1 # Number of iteration for recovery code, higher is more secure, but uses more resources for generation and check... +TOKEN_ISSUER_NAME = "PROJECT_NAME" # TOTP Issuer name + +U2F_APPID = "https://localhost:9000" # URL For U2F +FIDO_SERVER_ID = ( + "localhost" # Server rp id for FIDO2, it the full domain of your project +) +FIDO_SERVER_NAME = "TestApp" diff --git a/example/example/urls.py b/example/example/urls.py index 91c6937..d6e5556 100644 --- a/example/example/urls.py +++ b/example/example/urls.py @@ -14,15 +14,16 @@ 2. Add a URL to urlpatterns: path('blog/', include('blog.urls')) """ from django.contrib import admin -from django.urls import path,re_path,include -from . import views,auth +from django.urls import path, re_path, include +from . import views, auth from mfa import TrustedDevice + urlpatterns = [ - path('admin/', admin.site.urls), - path('mfa/', include('mfa.urls')), - path('auth/login',auth.loginView,name="login"), - path('auth/logout',auth.logoutView,name="logout"), - path('devices/add/', TrustedDevice.add,name="add_trusted_device"), - re_path('^$',views.home,name='home'), - path('registered/',views.registered,name='registered') + path("admin/", admin.site.urls), + path("mfa/", include("mfa.urls")), + path("auth/login", auth.loginView, name="login"), + path("auth/logout", auth.logoutView, name="logout"), + path("devices/add/", TrustedDevice.add, name="add_trusted_device"), + re_path("^$", views.home, name="home"), + path("registered/", views.registered, name="registered"), ] diff --git a/example/example/views.py b/example/example/views.py index 98e80e9..de1f3aa 100644 --- a/example/example/views.py +++ b/example/example/views.py @@ -4,8 +4,9 @@ @login_required() def home(request): - return render(request,"home.html",{}) + return render(request, "home.html", {}) + @login_required() def registered(request): - return render(request,"home.html",{"registered":True}) + return render(request, "home.html", {"registered": True}) diff --git a/mfa/Common.py b/mfa/Common.py index 1504340..2263ba9 100644 --- a/mfa/Common.py +++ b/mfa/Common.py @@ -1,19 +1,26 @@ from django.conf import settings from django.core.mail import EmailMessage + try: from django.urls import reverse -except: - from django.core.urlresolver import reverse +except ImportError: + from django.core.urlresolver import reverse # pyre-ignore[21] + -def send(to,subject,body): +def send(to, subject, body): from_email_address = settings.EMAIL_HOST_USER - if '@' not in from_email_address: + if "@" not in from_email_address: from_email_address = settings.DEFAULT_FROM_EMAIL From = "%s <%s>" % (settings.EMAIL_FROM, from_email_address) - email = EmailMessage(subject,body,From,to) + email = EmailMessage(subject, body, From, to) email.content_subtype = "html" return email.send(False) + def get_redirect_url(): - return {"redirect_html": reverse(getattr(settings, 'MFA_REDIRECT_AFTER_REGISTRATION', 'mfa_home')), - "reg_success_msg":getattr(settings,"MFA_SUCCESS_REGISTRATION_MSG")} + return { + "redirect_html": reverse( + getattr(settings, "MFA_REDIRECT_AFTER_REGISTRATION", "mfa_home") + ), + "reg_success_msg": getattr(settings, "MFA_SUCCESS_REGISTRATION_MSG"), + } diff --git a/mfa/Email.py b/mfa/Email.py index 047ec7a..0b3fdb2 100644 --- a/mfa/Email.py +++ b/mfa/Email.py @@ -1,72 +1,109 @@ +import datetime +from random import randint from django.shortcuts import render from django.views.decorators.cache import never_cache from django.template.context_processors import csrf -import datetime,random -from random import randint -from .models import * -#from django.template.context import RequestContext +from django.contrib.auth import get_user_model +from django.http import HttpResponseRedirect +from django.conf import settings + +from .models import User_Keys + from .views import login from .Common import send -def sendEmail(request,username,secret): + +def sendEmail(request, username, secret): """Send Email to the user after rendering `mfa_email_token_template`""" - from django.contrib.auth import get_user_model + User = get_user_model() - key = getattr(User, 'USERNAME_FIELD', 'username') + key = getattr(User, "USERNAME_FIELD", "username") kwargs = {key: username} user = User.objects.get(**kwargs) - res=render(request,"mfa_email_token_template.html",{"request":request,"user":user,'otp':secret}) - return send([user.email],"OTP", res.content.decode()) + res = render( + request, + "mfa_email_token_template.html", + {"request": request, "user": user, "otp": secret}, + ) + return send([user.email], "OTP", res.content.decode()) + @never_cache def start(request): """Start adding email as a 2nd factor""" context = csrf(request) if request.method == "POST": - if request.session["email_secret"] == request.POST["otp"]: #if successful - uk=User_Keys() - uk.username=request.user.username - uk.key_type="Email" - uk.enabled=1 + if request.session["email_secret"] == request.POST["otp"]: # if successful + uk = User_Keys() + uk.username = request.user.username + uk.key_type = "Email" + uk.enabled = 1 uk.save() - from django.http import HttpResponseRedirect + try: - from django.core.urlresolvers import reverse + from django.core.urlresolvers import reverse # pyre-ignore[21] except: from django.urls import reverse - if getattr(settings, 'MFA_ENFORCE_RECOVERY_METHOD', False) and not User_Keys.objects.filter( - key_type="RECOVERY", username=request.user.username).exists(): - request.session["mfa_reg"] = {"method": "Email", - "name": getattr(settings, "MFA_RENAME_METHODS", {}).get("Email", "Email")} + if ( + getattr(settings, "MFA_ENFORCE_RECOVERY_METHOD", False) + and not User_Keys.objects.filter( + key_type="RECOVERY", username=request.user.username + ).exists() + ): + request.session["mfa_reg"] = { + "method": "Email", + "name": getattr(settings, "MFA_RENAME_METHODS", {}).get( + "Email", "Email" + ), + } else: - return HttpResponseRedirect(reverse(getattr(settings,'MFA_REDIRECT_AFTER_REGISTRATION','mfa_home'))) + return HttpResponseRedirect( + reverse( + getattr(settings, "MFA_REDIRECT_AFTER_REGISTRATION", "mfa_home") + ) + ) context["invalid"] = True else: - request.session["email_secret"] = str(randint(0,100000)) #generate a random integer + request.session["email_secret"] = str( + randint(0, 100000) + ) # generate a random integer if sendEmail(request, request.user.username, request.session["email_secret"]): context["sent"] = True - return render(request,"Email/Add.html", context) + return render(request, "Email/Add.html", context) + + @never_cache def auth(request): """Authenticating the user by email.""" - context=csrf(request) - if request.method=="POST": - if request.session["email_secret"]==request.POST["otp"].strip(): - uk = User_Keys.objects.get(username=request.session["base_username"], key_type="Email") - mfa = {"verified": True, "method": "Email","id":uk.id} + context = csrf(request) + if request.method == "POST": + if request.session["email_secret"] == request.POST["otp"].strip(): + uk = User_Keys.objects.get( + username=request.session["base_username"], key_type="Email" + ) + mfa = {"verified": True, "method": "Email", "id": uk.id} if getattr(settings, "MFA_RECHECK", False): - mfa["next_check"] = datetime.datetime.timestamp(datetime.datetime.now() + datetime.timedelta( - seconds = random.randint(settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX))) + mfa["next_check"] = datetime.datetime.timestamp( + datetime.datetime.now() + + datetime.timedelta( + seconds=random.randint( + settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX + ) + ) + ) request.session["mfa"] = mfa from django.utils import timezone - uk.last_used=timezone.now() + + uk.last_used = timezone.now() uk.save() return login(request) - context["invalid"]=True + context["invalid"] = True else: request.session["email_secret"] = str(randint(0, 100000)) - if sendEmail(request, request.session["base_username"], request.session["email_secret"]): + if sendEmail( + request, request.session["base_username"], request.session["email_secret"] + ): context["sent"] = True - return render(request,"Email/Auth.html", context) + return render(request, "Email/Auth.html", context) diff --git a/mfa/FIDO2.py b/mfa/FIDO2.py index bd78ef0..c02c866 100644 --- a/mfa/FIDO2.py +++ b/mfa/FIDO2.py @@ -1,23 +1,23 @@ -from fido2.client import Fido2Client from fido2.server import Fido2Server, PublicKeyCredentialRpEntity from fido2.webauthn import AttestationObject, AuthenticatorData, CollectedClientData from django.template.context_processors import csrf from django.views.decorators.csrf import csrf_exempt from django.shortcuts import render -# from django.template.context import RequestContext + import simplejson from fido2 import cbor from django.http import HttpResponse from django.conf import settings -from .models import * from fido2.utils import websafe_decode, websafe_encode from fido2.webauthn import AttestedCredentialData from .views import login, reset_cookie +from .models import User_Keys import datetime from .Common import get_redirect_url from django.utils import timezone from django.http import JsonResponse + def recheck(request): """Starts FIDO2 recheck""" context = csrf(request) @@ -28,21 +28,28 @@ def recheck(request): def getServer(): """Get Server Info from settings and returns a Fido2Server""" - rp = PublicKeyCredentialRpEntity(id=settings.FIDO_SERVER_ID, name=settings.FIDO_SERVER_NAME) + rp = PublicKeyCredentialRpEntity( + id=settings.FIDO_SERVER_ID, name=settings.FIDO_SERVER_NAME + ) return Fido2Server(rp) def begin_registeration(request): """Starts registering a new FIDO Device, called from API""" server = getServer() - registration_data, state = server.register_begin({ - u'id': request.user.username.encode("utf8"), - u'name': (request.user.first_name + " " + request.user.last_name), - u'displayName': request.user.username, - }, getUserCredentials(request.user.username)) - request.session['fido_state'] = state + registration_data, state = server.register_begin( + { + "id": request.user.username.encode("utf8"), + "name": (request.user.first_name + " " + request.user.last_name), + "displayName": request.user.username, + }, + getUserCredentials(request.user.username), + ) + request.session["fido_state"] = state - return HttpResponse(cbor.encode(registration_data), content_type = 'application/octet-stream') + return HttpResponse( + cbor.encode(registration_data), content_type="application/octet-stream" + ) @csrf_exempt @@ -50,53 +57,81 @@ def complete_reg(request): """Completes the registeration, called by API""" try: if not "fido_state" in request.session: - return JsonResponse({'status': 'ERR', "message": "FIDO Status can't be found, please try again"}) + return JsonResponse( + { + "status": "ERR", + "message": "FIDO Status can't be found, please try again", + } + ) data = cbor.decode(request.body) - client_data = CollectedClientData(data['clientDataJSON']) - att_obj = AttestationObject((data['attestationObject'])) + client_data = CollectedClientData(data["clientDataJSON"]) + att_obj = AttestationObject((data["attestationObject"])) server = getServer() auth_data = server.register_complete( - request.session.pop('fido_state'), - client_data, - att_obj + request.session.pop("fido_state"), client_data, att_obj ) encoded = websafe_encode(auth_data.credential_data) uk = User_Keys() uk.username = request.user.username - uk.properties = {"device": encoded, "type": att_obj.fmt, } + uk.properties = { + "device": encoded, + "type": att_obj.fmt, + } uk.owned_by_enterprise = getattr(settings, "MFA_OWNED_BY_ENTERPRISE", False) uk.key_type = "FIDO2" uk.save() - if getattr(settings, 'MFA_ENFORCE_RECOVERY_METHOD', False) and not User_Keys.objects.filter(key_type = "RECOVERY", username=request.user.username).exists(): - request.session["mfa_reg"] = {"method":"FIDO2","name": getattr(settings, "MFA_RENAME_METHODS", {}).get("FIDO2", "FIDO2")} - return HttpResponse(simplejson.dumps({'status': 'RECOVERY'})) + if ( + getattr(settings, "MFA_ENFORCE_RECOVERY_METHOD", False) + and not User_Keys.objects.filter( + key_type="RECOVERY", username=request.user.username + ).exists() + ): + request.session["mfa_reg"] = { + "method": "FIDO2", + "name": getattr(settings, "MFA_RENAME_METHODS", {}).get( + "FIDO2", "FIDO2" + ), + } + return HttpResponse(simplejson.dumps({"status": "RECOVERY"})) else: - return HttpResponse(simplejson.dumps({'status': 'OK'})) + return HttpResponse(simplejson.dumps({"status": "OK"})) except Exception as exp: import traceback + print(traceback.format_exc()) try: from raven.contrib.django.raven_compat.models import client + client.captureException() except: pass - return JsonResponse({'status': 'ERR', "message": "Error on server, please try again later"}) + return JsonResponse( + {"status": "ERR", "message": "Error on server, please try again later"} + ) def start(request): """Start Registration a new FIDO Token""" context = csrf(request) context.update(get_redirect_url()) - context["method"] = {"name":getattr(settings,"MFA_RENAME_METHODS",{}).get("FIDO2","FIDO2 Security Key")} - context["RECOVERY_METHOD"]=getattr(settings,"MFA_RENAME_METHODS",{}).get("RECOVERY","Recovery codes") + context["method"] = { + "name": getattr(settings, "MFA_RENAME_METHODS", {}).get( + "FIDO2", "FIDO2 Security Key" + ) + } + context["RECOVERY_METHOD"] = getattr(settings, "MFA_RENAME_METHODS", {}).get( + "RECOVERY", "Recovery codes" + ) return render(request, "FIDO2/Add.html", context) def getUserCredentials(username): credentials = [] - for uk in User_Keys.objects.filter(username = username, key_type = "FIDO2"): - credentials.append(AttestedCredentialData(websafe_decode(uk.properties["device"]))) + for uk in User_Keys.objects.filter(username=username, key_type="FIDO2"): + credentials.append( + AttestedCredentialData(websafe_decode(uk.properties["device"])) + ) return credentials @@ -107,10 +142,12 @@ def auth(request): def authenticate_begin(request): server = getServer() - credentials = getUserCredentials(request.session.get("base_username", request.user.username)) + credentials = getUserCredentials( + request.session.get("base_username", request.user.username) + ) auth_data, state = server.authenticate_begin(credentials) - request.session['fido_state'] = state - return HttpResponse(cbor.encode(auth_data), content_type = "application/octet-stream") + request.session["fido_state"] = state + return HttpResponse(cbor.encode(auth_data), content_type="application/octet-stream") @csrf_exempt @@ -121,49 +158,76 @@ def authenticate_complete(request): server = getServer() credentials = getUserCredentials(username) data = cbor.decode(request.body) - credential_id = data['credentialId'] - client_data = CollectedClientData(data['clientDataJSON']) - auth_data = AuthenticatorData(data['authenticatorData']) - signature = data['signature'] + credential_id = data["credentialId"] + client_data = CollectedClientData(data["clientDataJSON"]) + auth_data = AuthenticatorData(data["authenticatorData"]) + signature = data["signature"] try: cred = server.authenticate_complete( - request.session.pop('fido_state'), + request.session.pop("fido_state"), credentials, credential_id, client_data, auth_data, - signature + signature, ) except ValueError: - return HttpResponse(simplejson.dumps({'status': "ERR", - "message": "Wrong challenge received, make sure that this is your security and try again."}), - content_type = "application/json") + return HttpResponse( + simplejson.dumps( + { + "status": "ERR", + "message": "Wrong challenge received, make sure that this is your security and try again.", + } + ), + content_type="application/json", + ) except Exception as excep: try: from raven.contrib.django.raven_compat.models import client + client.captureException() except: pass - return HttpResponse(simplejson.dumps({'status': "ERR", - "message": str(excep)}), - content_type = "application/json") + return HttpResponse( + simplejson.dumps({"status": "ERR", "message": str(excep)}), + content_type="application/json", + ) if request.session.get("mfa_recheck", False): import time + request.session["mfa"]["rechecked_at"] = time.time() - return HttpResponse(simplejson.dumps({'status': "OK"}), - content_type = "application/json") + return HttpResponse( + simplejson.dumps({"status": "OK"}), content_type="application/json" + ) else: import random - keys = User_Keys.objects.filter(username = username, key_type = "FIDO2", enabled = 1) + + keys = User_Keys.objects.filter( + username=username, key_type="FIDO2", enabled=1 + ) for k in keys: - if AttestedCredentialData(websafe_decode(k.properties["device"])).credential_id == cred.credential_id: + if ( + AttestedCredentialData( + websafe_decode(k.properties["device"]) + ).credential_id + == cred.credential_id + ): k.last_used = timezone.now() k.save() - mfa = {"verified": True, "method": "FIDO2", 'id': k.id} + mfa = {"verified": True, "method": "FIDO2", "id": k.id} if getattr(settings, "MFA_RECHECK", False): - mfa["next_check"] = datetime.datetime.timestamp((datetime.datetime.now() + datetime.timedelta( - seconds = random.randint(settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX)))) + mfa["next_check"] = datetime.datetime.timestamp( + ( + datetime.datetime.now() + + datetime.timedelta( + seconds=random.randint( + settings.MFA_RECHECK_MIN, + settings.MFA_RECHECK_MAX, + ) + ) + ) + ) request.session["mfa"] = mfa try: authenticated = request.user.is_authenticated @@ -171,11 +235,20 @@ def authenticate_complete(request): authenticated = request.user.is_authenticated() if not authenticated: res = login(request) - if not "location" in res: return reset_cookie(request) - return HttpResponse(simplejson.dumps({'status': "OK", "redirect": res["location"]}), - content_type = "application/json") - return HttpResponse(simplejson.dumps({'status': "OK"}), - content_type = "application/json") + if not "location" in res: + return reset_cookie(request) + return HttpResponse( + simplejson.dumps( + {"status": "OK", "redirect": res["location"]} + ), + content_type="application/json", + ) + return HttpResponse( + simplejson.dumps({"status": "OK"}), + content_type="application/json", + ) except Exception as exp: - return HttpResponse(simplejson.dumps({'status': "ERR", "message": exp.message}), - content_type = "application/json") + return HttpResponse( + simplejson.dumps({"status": "ERR", "message": exp.message}), + content_type="application/json", + ) diff --git a/mfa/TrustedDevice.py b/mfa/TrustedDevice.py index a7664fe..e11a5d8 100644 --- a/mfa/TrustedDevice.py +++ b/mfa/TrustedDevice.py @@ -1,139 +1,178 @@ import string import random +from datetime import datetime, timedelta from django.shortcuts import render from django.http import HttpResponse -from django.template.context import RequestContext from django.template.context_processors import csrf -from .models import * import user_agents from django.utils import timezone from django.urls import reverse +from .models import User_Keys + + def id_generator(size=6, chars=string.ascii_uppercase + string.digits): - x=''.join(random.choice(chars) for _ in range(size)) - if not User_Keys.objects.filter(properties__icontains='"key": "%s"'%x).exists(): return x - else: return id_generator(size,chars) + x = "".join(random.choice(chars) for _ in range(size)) + if not User_Keys.objects.filter(properties__icontains='"key": "%s"' % x).exists(): + return x + return id_generator(size, chars) + def getUserAgent(request): - id=id=request.session.get("td_id",None) - if id: - tk=User_Keys.objects.get(id=id) - if tk.properties.get("user_agent","")!="": + device_id = request.session.get("td_id", None) + if device_id: + tk = User_Keys.objects.get(id=device_id) + if tk.properties.get("user_agent", "") != "": ua = user_agents.parse(tk.properties["user_agent"]) - res = render(None, "TrustedDevices/user-agent.html", context={"ua":ua}) + res = render(None, "TrustedDevices/user-agent.html", context={"ua": ua}) return HttpResponse(res) - return HttpResponse("") + return HttpResponse("No Device provide", status=401) + def trust_device(request): tk = User_Keys.objects.get(id=request.session["td_id"]) - tk.properties["status"]="trusted" + tk.properties["status"] = "trusted" tk.save() del request.session["td_id"] return HttpResponse("OK") + def checkTrusted(request): res = "" - id=request.session.get("td_id","") - if id!="": + id = request.session.get("td_id", "") + if id != "": try: tk = User_Keys.objects.get(id=id) - if tk.properties["status"] == "trusted": res = "OK" + if tk.properties["status"] == "trusted": + res = "OK" except: pass return HttpResponse(res) + def getCookie(request): tk = User_Keys.objects.get(id=request.session["td_id"]) if tk.properties["status"] == "trusted": - context={"added":True} - response = render(request,"TrustedDevices/Done.html", context) - from datetime import datetime, timedelta + context = {"added": True} + response = render(request, "TrustedDevices/Done.html", context) + expires = datetime.now() + timedelta(days=180) - tk.expires=expires + tk.expires = expires tk.save() response.set_cookie("deviceid", tk.properties["signature"], expires=expires) return response + def add(request): - context=csrf(request) - if request.method=="GET": - context.update({"username":request.GET.get('u',''),"key":request.GET.get('k','')}) - return render(request,"TrustedDevices/Add.html",context) + context = csrf(request) + if request.method == "GET": + context.update( + {"username": request.GET.get("u", ""), "key": request.GET.get("k", "")} + ) + return render(request, "TrustedDevices/Add.html", context) else: - key=request.POST["key"].replace("-","").replace(" ","").upper() + key = request.POST["key"].replace("-", "").replace(" ", "").upper() context["username"] = request.POST["username"] context["key"] = request.POST["key"] - trusted_keys=User_Keys.objects.filter(username=request.POST["username"],properties__icontains='"key": "%s"'%key) - cookie=False + trusted_keys = User_Keys.objects.filter( + username=request.POST["username"], properties__icontains='"key": "%s"' % key + ) + cookie = False if trusted_keys.exists(): - tk=trusted_keys[0] - request.session["td_id"]=tk.id - ua=request.META['HTTP_USER_AGENT'] - agent=user_agents.parse(ua) + tk = trusted_keys[0] + request.session["td_id"] = tk.id + ua = request.META["HTTP_USER_AGENT"] + agent = user_agents.parse(ua) if agent.is_pc: - context["invalid"]="This is a PC, it can't used as a trusted device." + context["invalid"] = "This is a PC, it can't used as a trusted device." else: - tk.properties["user_agent"]=ua + tk.properties["user_agent"] = ua tk.save() - context["success"]=True + context["success"] = True # tk.properties["user_agent"]=ua # tk.save() # context["success"]=True else: - context["invalid"]="The username or key is wrong, please check and try again." + context[ + "invalid" + ] = "The username or key is wrong, please check and try again." + + return render(request, "TrustedDevices/Add.html", context) - return render(request,"TrustedDevices/Add.html", context) def start(request): - if User_Keys.objects.filter(username=request.user.username,key_type="Trusted Device").count()>= 2: - return render(request,"TrustedDevices/start.html",{"not_allowed":True}) - td=None - if not request.session.get("td_id",None): - td=User_Keys() - td.username=request.user.username - td.properties={"key":id_generator(),"status":"adding"} - td.key_type="Trusted Device" + if ( + User_Keys.objects.filter( + username=request.user.username, key_type="Trusted Device" + ).count() + >= 2 + ): + return render(request, "TrustedDevices/start.html", {"not_allowed": True}) + td = None + if not request.session.get("td_id", None): + td = User_Keys() + td.username = request.user.username + td.properties = {"key": id_generator(), "status": "adding"} + td.key_type = "Trusted Device" td.save() - request.session["td_id"]=td.id + request.session["td_id"] = td.id try: - if td==None: td=User_Keys.objects.get(id=request.session["td_id"]) - context={"key":td.properties["key"],"url":request.scheme+"://"+request.get_host() + reverse('add_trusted_device')} + if td == None: + td = User_Keys.objects.get(id=request.session["td_id"]) + context = { + "key": td.properties["key"], + "url": request.scheme + + "://" + + request.get_host() + + reverse("add_trusted_device"), + } except: del request.session["td_id"] return start(request) - return render(request,"TrustedDevices/start.html",context) + return render(request, "TrustedDevices/start.html", context) + def send_email(request): - body=render(request,"TrustedDevices/email.html",{}).content + body = render(request, "TrustedDevices/email.html", {}).content from .Common import send - e=request.user.email - if e=="": - e=request.session.get("user",{}).get("email","") - if e=="": + + e = request.user.email + if e == "": + e = request.session.get("user", {}).get("email", "") + if e == "": res = "User has no email on the system." - elif send([e],"Add Trusted Device Link",body): - res="Sent Successfully" + elif send([e], "Add Trusted Device Link", body): + res = "Sent Successfully" else: - res="Error occured, please try again later." + res = "Error occured, please try again later." return HttpResponse(res) def verify(request): - if request.COOKIES.get('deviceid',None): + if request.COOKIES.get("deviceid", None): from jose import jwt - json= jwt.decode(request.COOKIES.get('deviceid'),settings.SECRET_KEY) - if json["username"].lower()== request.session['base_username'].lower(): + + json = jwt.decode(request.COOKIES.get("deviceid"), settings.SECRET_KEY) + if json["username"].lower() == request.session["base_username"].lower(): try: - uk = User_Keys.objects.get(username=request.POST["username"].lower(), properties__icontains='"key": "%s"'%json["key"]) + uk = User_Keys.objects.get( + username=request.POST["username"].lower(), + properties__icontains='"key": "%s"' % json["key"], + ) if uk.enabled and uk.properties["status"] == "trusted": - uk.last_used=timezone.now() + uk.last_used = timezone.now() uk.save() - request.session["mfa"] = {"verified": True, "method": "Trusted Device","id":uk.id} + request.session["mfa"] = { + "verified": True, + "method": "Trusted Device", + "id": uk.id, + } return True except: import traceback + print(traceback.format_exc()) return False return False diff --git a/mfa/U2F.py b/mfa/U2F.py index bf247cc..2bed454 100644 --- a/mfa/U2F.py +++ b/mfa/U2F.py @@ -1,123 +1,171 @@ - -from u2flib_server.u2f import (begin_registration, begin_authentication, - complete_registration, complete_authentication) +from u2flib_server.u2f import ( + begin_registration, + begin_authentication, + complete_registration, + complete_authentication, +) from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.serialization import Encoding from django.shortcuts import render import simplejson -#from django.template.context import RequestContext + from django.template.context_processors import csrf -from django.conf import settings from django.http import HttpResponse -from .models import * +from django.conf import settings +from .models import User_Keys from .views import login from .Common import get_redirect_url -import datetime from django.utils import timezone + def recheck(request): context = csrf(request) - context["mode"]="recheck" + context["mode"] = "recheck" s = sign(request.user.username) request.session["_u2f_challenge_"] = s[0] context["token"] = s[1] - request.session["mfa_recheck"]=True - return render(request,"U2F/recheck.html", context) + request.session["mfa_recheck"] = True + return render(request, "U2F/recheck.html", context) + def process_recheck(request): - x=validate(request,request.user.username) - if x==True: + x = validate(request, request.user.username) + if x == True: import time + request.session["mfa"]["rechecked_at"] = time.time() - return HttpResponse(simplejson.dumps({"recheck":True}),content_type="application/json") + return HttpResponse( + simplejson.dumps({"recheck": True}), content_type="application/json" + ) return x + def check_errors(request, data): if "errorCode" in data: - if data["errorCode"] == 0: return True + if data["errorCode"] == 0: + return True if data["errorCode"] == 4: return HttpResponse("Invalid Security Key") if data["errorCode"] == 1: return auth(request) return True -def validate(request,username): + + +def validate(request, username): import datetime, random data = simplejson.loads(request.POST["response"]) - res= check_errors(request,data) - if res!=True: + res = check_errors(request, data) + if res != True: return res - challenge = request.session.pop('_u2f_challenge_') + challenge = request.session.pop("_u2f_challenge_") device, c, t = complete_authentication(challenge, data, [settings.U2F_APPID]) try: - key=User_Keys.objects.get(username=username,properties__icontains='"publicKey": "%s"'%device["publicKey"]) - key.last_used=timezone.now() + key = User_Keys.objects.get( + username=username, + properties__icontains='"publicKey": "%s"' % device["publicKey"], + ) + key.last_used = timezone.now() key.save() - mfa = {"verified": True, "method": "U2F","id":key.id} + mfa = {"verified": True, "method": "U2F", "id": key.id} if getattr(settings, "MFA_RECHECK", False): - mfa["next_check"] = datetime.datetime.timestamp((datetime.datetime.now() - + datetime.timedelta( - seconds=random.randint(settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX)))) + mfa["next_check"] = datetime.datetime.timestamp( + ( + datetime.datetime.now() + + datetime.timedelta( + seconds=random.randint( + settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX + ) + ) + ) + ) request.session["mfa"] = mfa return True except: return False - def auth(request): - context=csrf(request) - s=sign(request.session["base_username"]) - request.session["_u2f_challenge_"]=s[0] - context["token"]=s[1] - context["method"] = {"name": getattr(settings, "MFA_RENAME_METHODS", {}).get("U2F", "Classical Security Key")} - return render(request,"U2F/Auth.html",context) + context = csrf(request) + s = sign(request.session["base_username"]) + request.session["_u2f_challenge_"] = s[0] + context["token"] = s[1] + context["method"] = { + "name": getattr(settings, "MFA_RENAME_METHODS", {}).get( + "U2F", "Classical Security Key" + ) + } + return render(request, "U2F/Auth.html", context) + def start(request): enroll = begin_registration(settings.U2F_APPID, []) - request.session['_u2f_enroll_'] = enroll.json - context=csrf(request) - context["token"]=simplejson.dumps(enroll.data_for_client) + request.session["_u2f_enroll_"] = enroll.json + context = csrf(request) + context["token"] = simplejson.dumps(enroll.data_for_client) context.update(get_redirect_url()) - context["method"] = {"name": getattr(settings, "MFA_RENAME_METHODS", {}).get("U2F", "Classical Security Key")} - context["RECOVERY_METHOD"] = getattr(settings, "MFA_RENAME_METHODS", {}).get("RECOVERY", "Recovery codes") - return render(request,"U2F/Add.html",context) + context["method"] = { + "name": getattr(settings, "MFA_RENAME_METHODS", {}).get( + "U2F", "Classical Security Key" + ) + } + context["RECOVERY_METHOD"] = getattr(settings, "MFA_RENAME_METHODS", {}).get( + "RECOVERY", "Recovery codes" + ) + return render(request, "U2F/Add.html", context) def bind(request): import hashlib - enroll = request.session['_u2f_enroll_'] - data=simplejson.loads(request.POST["response"]) + + enroll = request.session["_u2f_enroll_"] + data = simplejson.loads(request.POST["response"]) device, cert = complete_registration(enroll, data, [settings.U2F_APPID]) cert = x509.load_der_x509_certificate(cert, default_backend()) - cert_hash=hashlib.md5(cert.public_bytes(Encoding.PEM)).hexdigest() - q=User_Keys.objects.filter(key_type="U2F", properties__icontains= cert_hash) + cert_hash = hashlib.md5(cert.public_bytes(Encoding.PEM)).hexdigest() + q = User_Keys.objects.filter(key_type="U2F", properties__icontains=cert_hash) if q.exists(): - return HttpResponse("This key is registered before, it can't be registered again.") - User_Keys.objects.filter(username=request.user.username,key_type="U2F").delete() + return HttpResponse( + "This key is registered before, it can't be registered again." + ) + User_Keys.objects.filter(username=request.user.username, key_type="U2F").delete() uk = User_Keys() uk.username = request.user.username uk.owned_by_enterprise = getattr(settings, "MFA_OWNED_BY_ENTERPRISE", False) - uk.properties = {"device":simplejson.loads(device.json),"cert":cert_hash} + uk.properties = {"device": simplejson.loads(device.json), "cert": cert_hash} uk.key_type = "U2F" uk.save() - if getattr(settings, 'MFA_ENFORCE_RECOVERY_METHOD', False) and not User_Keys.objects.filter(key_type="RECOVERY", - username=request.user.username).exists(): - request.session["mfa_reg"] = {"method": "U2F", - "name": getattr(settings, "MFA_RENAME_METHODS", {}).get("U2F", "Classical Security Key")} - return HttpResponse('RECOVERY') + if ( + getattr(settings, "MFA_ENFORCE_RECOVERY_METHOD", False) + and not User_Keys.objects.filter( + key_type="RECOVERY", username=request.user.username + ).exists() + ): + request.session["mfa_reg"] = { + "method": "U2F", + "name": getattr(settings, "MFA_RENAME_METHODS", {}).get( + "U2F", "Classical Security Key" + ), + } + return HttpResponse("RECOVERY") return HttpResponse("OK") + def sign(username): - u2f_devices=[d.properties["device"] for d in User_Keys.objects.filter(username=username,key_type="U2F")] + u2f_devices = [ + d.properties["device"] + for d in User_Keys.objects.filter(username=username, key_type="U2F") + ] challenge = begin_authentication(settings.U2F_APPID, u2f_devices) - return [challenge.json,simplejson.dumps(challenge.data_for_client)] + return [challenge.json, simplejson.dumps(challenge.data_for_client)] + def verify(request): - x= validate(request,request.session["base_username"]) - if x==True: + x = validate(request, request.session["base_username"]) + if x == True: return login(request) - else: return x + else: + return x diff --git a/mfa/__init__.py b/mfa/__init__.py index d9d106a..8a124bf 100644 --- a/mfa/__init__.py +++ b/mfa/__init__.py @@ -1 +1 @@ -__version__="2.2.0" +__version__ = "2.2.0" diff --git a/mfa/apps.py b/mfa/apps.py index cb5ecca..f9816d4 100644 --- a/mfa/apps.py +++ b/mfa/apps.py @@ -1,4 +1,6 @@ from django.apps import AppConfig + + class myAppNameConfig(AppConfig): - name = 'mfa' - verbose_name = 'A Much Better Name' \ No newline at end of file + name = "mfa" + verbose_name = "A Much Better Name" diff --git a/mfa/helpers.py b/mfa/helpers.py index a61c769..39e85b7 100644 --- a/mfa/helpers.py +++ b/mfa/helpers.py @@ -1,32 +1,46 @@ -import pyotp -from .models import * -from . import TrustedDevice, U2F, FIDO2, totp import simplejson from django.shortcuts import HttpResponse -from mfa.views import verify,goto -def has_mfa(request,username): - if User_Keys.objects.filter(username=username,enabled=1).count()>0: +from mfa.views import verify +from . import TrustedDevice, U2F, FIDO2, totp +from .models import User_Keys + + +def has_mfa(request, username): + if User_Keys.objects.filter(username=username, enabled=1).count() > 0: return verify(request, username) return False -def is_mfa(request,ignore_methods=[]): - if request.session.get("mfa",{}).get("verified",False): - if not request.session.get("mfa",{}).get("method",None) in ignore_methods: + +def is_mfa(request, ignore_methods=[]): + if request.session.get("mfa", {}).get("verified", False): + if not request.session.get("mfa", {}).get("method", None) in ignore_methods: return True return False + def recheck(request): - method=request.session.get("mfa",{}).get("method",None) + method = request.session.get("mfa", {}).get("method", None) if not method: - return HttpResponse(simplejson.dumps({"res":False}),content_type="application/json") - if method=="Trusted Device": - return HttpResponse(simplejson.dumps({"res":TrustedDevice.verify(request)}),content_type="application/json") - elif method=="U2F": - return HttpResponse(simplejson.dumps({"html": U2F.recheck(request).content}), content_type="application/json") + return HttpResponse( + simplejson.dumps({"res": False}), content_type="application/json" + ) + if method == "Trusted Device": + return HttpResponse( + simplejson.dumps({"res": TrustedDevice.verify(request)}), + content_type="application/json", + ) + elif method == "U2F": + return HttpResponse( + simplejson.dumps({"html": U2F.recheck(request).content}), + content_type="application/json", + ) elif method == "FIDO2": - return HttpResponse(simplejson.dumps({"html": FIDO2.recheck(request).content}), content_type="application/json") - elif method=="TOTP": - return HttpResponse(simplejson.dumps({"html": totp.recheck(request).content}), content_type="application/json") - - - + return HttpResponse( + simplejson.dumps({"html": FIDO2.recheck(request).content}), + content_type="application/json", + ) + elif method == "TOTP": + return HttpResponse( + simplejson.dumps({"html": totp.recheck(request).content}), + content_type="application/json", + ) diff --git a/mfa/middleware.py b/mfa/middleware.py index 4923416..416cc48 100644 --- a/mfa/middleware.py +++ b/mfa/middleware.py @@ -1,13 +1,25 @@ import time from django.http import HttpResponseRedirect -from django.core.urlresolvers import reverse + +try: + from django.urls import reverse +except ImportError: + from django.core.urlresolvers import reverse # pyre-ignore[21] + + from django.conf import settings + + def process(request): - next_check=request.session.get('mfa',{}).get("next_check",False) - if not next_check: return None - now=int(time.time()) + next_check = request.session.get("mfa", {}).get("next_check", False) + if not next_check: + return None + now = int(time.time()) if now >= next_check: - method=request.session["mfa"]["method"] + method = request.session["mfa"]["method"] path = request.META["PATH_INFO"] - return HttpResponseRedirect(reverse(method+"_auth")+"?next=%s"%(settings.BASE_URL + path).replace("//", "/")) - return None \ No newline at end of file + return HttpResponseRedirect( + reverse(method + "_auth") + + "?next=%s" % (settings.BASE_URL + path).replace("//", "/") + ) + return None diff --git a/mfa/migrations/0001_initial.py b/mfa/migrations/0001_initial.py index c243f37..e280050 100644 --- a/mfa/migrations/0001_initial.py +++ b/mfa/migrations/0001_initial.py @@ -5,18 +5,24 @@ class Migration(migrations.Migration): - - dependencies = [ - ] + dependencies = [] operations = [ migrations.CreateModel( - name='User_Keys', + name="User_Keys", fields=[ - ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), - ('username', models.CharField(max_length=50)), - ('secret_key', models.CharField(max_length=15)), - ('added_on', models.DateTimeField(auto_now_add=True)), + ( + "id", + models.AutoField( + verbose_name="ID", + serialize=False, + auto_created=True, + primary_key=True, + ), + ), + ("username", models.CharField(max_length=50)), + ("secret_key", models.CharField(max_length=15)), + ("added_on", models.DateTimeField(auto_now_add=True)), ], ), ] diff --git a/mfa/migrations/0002_user_keys_key_type.py b/mfa/migrations/0002_user_keys_key_type.py index 5cb4aef..a341a38 100644 --- a/mfa/migrations/0002_user_keys_key_type.py +++ b/mfa/migrations/0002_user_keys_key_type.py @@ -5,15 +5,14 @@ class Migration(migrations.Migration): - dependencies = [ - ('mfa', '0001_initial'), + ("mfa", "0001_initial"), ] operations = [ migrations.AddField( - model_name='user_keys', - name='key_type', - field=models.CharField(default=b'TOTP', max_length=25), + model_name="user_keys", + name="key_type", + field=models.CharField(default=b"TOTP", max_length=25), ), ] diff --git a/mfa/migrations/0003_auto_20181114_2159.py b/mfa/migrations/0003_auto_20181114_2159.py index 49dfd5e..3d6e5e5 100644 --- a/mfa/migrations/0003_auto_20181114_2159.py +++ b/mfa/migrations/0003_auto_20181114_2159.py @@ -5,15 +5,14 @@ class Migration(migrations.Migration): - dependencies = [ - ('mfa', '0002_user_keys_key_type'), + ("mfa", "0002_user_keys_key_type"), ] operations = [ migrations.AlterField( - model_name='user_keys', - name='secret_key', + model_name="user_keys", + name="secret_key", field=models.CharField(max_length=32), ), ] diff --git a/mfa/migrations/0004_user_keys_enabled.py b/mfa/migrations/0004_user_keys_enabled.py index b0bcb78..90f7982 100644 --- a/mfa/migrations/0004_user_keys_enabled.py +++ b/mfa/migrations/0004_user_keys_enabled.py @@ -5,15 +5,14 @@ class Migration(migrations.Migration): - dependencies = [ - ('mfa', '0003_auto_20181114_2159'), + ("mfa", "0003_auto_20181114_2159"), ] operations = [ migrations.AddField( - model_name='user_keys', - name='enabled', + model_name="user_keys", + name="enabled", field=models.BooleanField(default=True), ), ] diff --git a/mfa/migrations/0005_auto_20181115_2014.py b/mfa/migrations/0005_auto_20181115_2014.py index 3f65a26..7e194fb 100644 --- a/mfa/migrations/0005_auto_20181115_2014.py +++ b/mfa/migrations/0005_auto_20181115_2014.py @@ -2,36 +2,39 @@ from __future__ import unicode_literals from django.db import models, migrations + try: from django.db.models import JSONField except ImportError: try: - from jsonfield.fields import JSONField + from jsonfield.fields import JSONField # pyre-ignore[21] except ImportError: - raise ImportError("Can't find a JSONField implementation, please install jsonfield if django < 4.0") - + raise ImportError( + "Can't find a JSONField implementation, please install jsonfield if django < 4.0" + ) def modify_json(apps, schema_editor): from django.conf import settings + if "mysql" in settings.DATABASES.get("default", {}).get("engine", ""): migrations.RunSQL("alter table mfa_user_keys modify column properties json;") class Migration(migrations.Migration): dependencies = [ - ('mfa', '0004_user_keys_enabled'), + ("mfa", "0004_user_keys_enabled"), ] operations = [ migrations.RemoveField( - model_name='user_keys', - name='secret_key', + model_name="user_keys", + name="secret_key", ), migrations.AddField( - model_name='user_keys', - name='properties', + model_name="user_keys", + name="properties", field=JSONField(null=True), ), - migrations.RunPython(modify_json) + migrations.RunPython(modify_json), ] diff --git a/mfa/migrations/0006_trusted_devices.py b/mfa/migrations/0006_trusted_devices.py index de14a0e..7ce08bd 100644 --- a/mfa/migrations/0006_trusted_devices.py +++ b/mfa/migrations/0006_trusted_devices.py @@ -5,23 +5,30 @@ class Migration(migrations.Migration): - dependencies = [ - ('mfa', '0005_auto_20181115_2014'), + ("mfa", "0005_auto_20181115_2014"), ] operations = [ migrations.CreateModel( - name='Trusted_Devices', + name="Trusted_Devices", fields=[ - ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), - ('signature', models.CharField(max_length=255)), - ('key', models.CharField(max_length=6)), - ('username', models.CharField(max_length=50)), - ('user_agent', models.CharField(max_length=255)), - ('status', models.CharField(default=b'adding', max_length=255)), - ('added_on', models.DateTimeField(auto_now_add=True)), - ('last_used', models.DateTimeField(default=None, null=True)), + ( + "id", + models.AutoField( + verbose_name="ID", + serialize=False, + auto_created=True, + primary_key=True, + ), + ), + ("signature", models.CharField(max_length=255)), + ("key", models.CharField(max_length=6)), + ("username", models.CharField(max_length=50)), + ("user_agent", models.CharField(max_length=255)), + ("status", models.CharField(default=b"adding", max_length=255)), + ("added_on", models.DateTimeField(auto_now_add=True)), + ("last_used", models.DateTimeField(default=None, null=True)), ], ), ] diff --git a/mfa/migrations/0007_auto_20181230_1549.py b/mfa/migrations/0007_auto_20181230_1549.py index 965bb9e..ba28391 100644 --- a/mfa/migrations/0007_auto_20181230_1549.py +++ b/mfa/migrations/0007_auto_20181230_1549.py @@ -5,18 +5,17 @@ class Migration(migrations.Migration): - dependencies = [ - ('mfa', '0006_trusted_devices'), + ("mfa", "0006_trusted_devices"), ] operations = [ migrations.DeleteModel( - name='Trusted_Devices', + name="Trusted_Devices", ), migrations.AddField( - model_name='user_keys', - name='expires', + model_name="user_keys", + name="expires", field=models.DateTimeField(default=None, null=True, blank=True), ), ] diff --git a/mfa/migrations/0008_user_keys_last_used.py b/mfa/migrations/0008_user_keys_last_used.py index b555762..a8f46e9 100644 --- a/mfa/migrations/0008_user_keys_last_used.py +++ b/mfa/migrations/0008_user_keys_last_used.py @@ -5,15 +5,14 @@ class Migration(migrations.Migration): - dependencies = [ - ('mfa', '0007_auto_20181230_1549'), + ("mfa", "0007_auto_20181230_1549"), ] operations = [ migrations.AddField( - model_name='user_keys', - name='last_used', + model_name="user_keys", + name="last_used", field=models.DateTimeField(default=None, null=True, blank=True), ), ] diff --git a/mfa/migrations/0009_user_keys_owned_by_enterprise.py b/mfa/migrations/0009_user_keys_owned_by_enterprise.py index 9185fcc..05022c3 100644 --- a/mfa/migrations/0009_user_keys_owned_by_enterprise.py +++ b/mfa/migrations/0009_user_keys_owned_by_enterprise.py @@ -6,21 +6,22 @@ def update_owned_by_enterprise(apps, schema_editor): - user_keys = apps.get_model('mfa', 'user_keys') - user_keys.objects.filter(key_type='FIDO2').update(owned_by_enterprise=getattr(settings,"MFA_OWNED_BY_ENTERPRISE",False)) + user_keys = apps.get_model("mfa", "user_keys") + user_keys.objects.filter(key_type="FIDO2").update( + owned_by_enterprise=getattr(settings, "MFA_OWNED_BY_ENTERPRISE", False) + ) class Migration(migrations.Migration): - dependencies = [ - ('mfa', '0008_user_keys_last_used'), + ("mfa", "0008_user_keys_last_used"), ] operations = [ migrations.AddField( - model_name='user_keys', - name='owned_by_enterprise', + model_name="user_keys", + name="owned_by_enterprise", field=models.NullBooleanField(default=None), ), - migrations.RunPython(update_owned_by_enterprise) + migrations.RunPython(update_owned_by_enterprise), ] diff --git a/mfa/migrations/0010_auto_20201110_0557.py b/mfa/migrations/0010_auto_20201110_0557.py index 48190b6..09c0714 100644 --- a/mfa/migrations/0010_auto_20201110_0557.py +++ b/mfa/migrations/0010_auto_20201110_0557.py @@ -4,15 +4,14 @@ class Migration(migrations.Migration): - dependencies = [ - ('mfa', '0009_user_keys_owned_by_enterprise'), + ("mfa", "0009_user_keys_owned_by_enterprise"), ] operations = [ migrations.AlterField( - model_name='user_keys', - name='key_type', - field=models.CharField(default='TOTP', max_length=25), + model_name="user_keys", + name="key_type", + field=models.CharField(default="TOTP", max_length=25), ), ] diff --git a/mfa/migrations/0011_auto_20210530_0622.py b/mfa/migrations/0011_auto_20210530_0622.py index 3d215c2..19bf003 100644 --- a/mfa/migrations/0011_auto_20210530_0622.py +++ b/mfa/migrations/0011_auto_20210530_0622.py @@ -4,15 +4,14 @@ class Migration(migrations.Migration): - dependencies = [ - ('mfa', '0010_auto_20201110_0557'), + ("mfa", "0010_auto_20201110_0557"), ] operations = [ migrations.AlterField( - model_name='user_keys', - name='owned_by_enterprise', + model_name="user_keys", + name="owned_by_enterprise", field=models.BooleanField(blank=True, default=None, null=True), ), ] diff --git a/mfa/models.py b/mfa/models.py index 9d0ac12..d5c205e 100644 --- a/mfa/models.py +++ b/mfa/models.py @@ -1,39 +1,51 @@ from django.db import models +from jose import jwt +from django.conf import settings + try: from django.db.models import JSONField except ModuleNotFoundError: try: - from jsonfield import JSONField - except ModuleNotFoundError: - raise ModuleNotFoundError("Can't find a JSONField implementation, please install jsonfield if django < 4.0") - -from jose import jwt -from django.conf import settings -#from jsonLookup import shasLookup, hasLookup -# JSONField.register_lookup(shasLookup) -# JSONField.register_lookup(hasLookup) + from jsonfield import JSONField # pyre-ignore[21] + except ModuleNotFoundError as exc: + raise ModuleNotFoundError( + "Can't find a JSONField implementation, please install jsonfield if django < 4.0" + ) class User_Keys(models.Model): - username=models.CharField(max_length = 50) - properties=JSONField(null = True) - added_on=models.DateTimeField(auto_now_add = True) - key_type=models.CharField(max_length = 25,default = "TOTP") - enabled=models.BooleanField(default=True) - expires=models.DateTimeField(null=True,default=None,blank=True) - last_used=models.DateTimeField(null=True,default=None,blank=True) - owned_by_enterprise=models.BooleanField(default=None,null=True,blank=True) + username = models.CharField(max_length=50) + properties = JSONField(null=True) + added_on = models.DateTimeField(auto_now_add=True) + key_type = models.CharField(max_length=25, default="TOTP") + enabled = models.BooleanField(default=True) + expires = models.DateTimeField(null=True, default=None, blank=True) + last_used = models.DateTimeField(null=True, default=None, blank=True) + owned_by_enterprise = models.BooleanField(default=None, null=True, blank=True) - def save(self, force_insert=False, force_update=False, using=None, update_fields=None): - if self.key_type == "Trusted Device" and self.properties.get("signature","") == "": - self.properties["signature"]= jwt.encode({"username": self.username, "key": self.properties["key"]}, settings.SECRET_KEY) - super(User_Keys, self).save(force_insert=force_insert, force_update=force_update, using=using, update_fields=update_fields) + def save( + self, force_insert=False, force_update=False, using=None, update_fields=None + ): + if ( + self.key_type == "Trusted Device" + and self.properties.get("signature", "") == "" + ): + self.properties["signature"] = jwt.encode( + {"username": self.username, "key": self.properties["key"]}, + settings.SECRET_KEY, + ) + super(User_Keys, self).save( + force_insert=force_insert, + force_update=force_update, + using=using, + update_fields=update_fields, + ) def __unicode__(self): - return "%s -- %s"%(self.username,self.key_type) + return "%s -- %s" % (self.username, self.key_type) def __str__(self): return self.__unicode__() class Meta: - app_label='mfa' + app_label = "mfa" diff --git a/mfa/recovery.py b/mfa/recovery.py index f2c1547..f7e4870 100644 --- a/mfa/recovery.py +++ b/mfa/recovery.py @@ -1,117 +1,147 @@ +import time +import random +import string +import simplejson + from django.shortcuts import render from django.views.decorators.cache import never_cache from django.template.context_processors import csrf +from django.utils import timezone from django.contrib.auth.hashers import make_password, PBKDF2PasswordHasher from django.http import HttpResponse +from django.conf import settings from .Common import get_redirect_url -from .models import * -import simplejson -import random -import string -import datetime -from django.utils import timezone +from .models import User_Keys USER_FRIENDLY_NAME = "Recovery Codes" + class Hash(PBKDF2PasswordHasher): - algorithm = 'pbkdf2_sha256_custom' - iterations = getattr(settings,"RECOVERY_ITERATION",1) + algorithm = "pbkdf2_sha256_custom" + iterations = getattr(settings, "RECOVERY_ITERATION", 1) + def delTokens(request): - #Only when all MFA have been deactivated, or to generate new ! - #We iterate only to clean if any error happend and multiple entry of RECOVERY created for one user - for key in User_Keys.objects.filter(username=request.user.username, key_type = "RECOVERY"): + # Only when all MFA have been deactivated, or to generate new ! + # We iterate only to clean if any error happend and multiple entry of RECOVERY created for one user + for key in User_Keys.objects.filter( + username=request.user.username, key_type="RECOVERY" + ): if key.username == request.user.username: key.delete() + def randomGen(n): - return ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(n)) + return "".join( + random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) + for _ in range(n) + ) + @never_cache def genTokens(request): - #Delete old ones + # Delete old ones delTokens(request) - #Then generate new one + # Then generate new one salt = randomGen(15) hashedKeys = [] clearKeys = [] for i in range(5): - token = randomGen(5) + "-" + randomGen(5) - hashedToken = make_password(token, salt, 'pbkdf2_sha256_custom') - hashedKeys.append(hashedToken) - clearKeys.append(token) - uk=User_Keys() + token = randomGen(5) + "-" + randomGen(5) + hashedToken = make_password(token, salt, "pbkdf2_sha256_custom") + hashedKeys.append(hashedToken) + clearKeys.append(token) + uk = User_Keys() uk.username = request.user.username - uk.properties={"secret_keys":hashedKeys, "salt":salt} - uk.key_type="RECOVERY" + uk.properties = {"secret_keys": hashedKeys, "salt": salt} + uk.key_type = "RECOVERY" uk.enabled = True uk.save() - return HttpResponse(simplejson.dumps({"keys":clearKeys})) + return HttpResponse(simplejson.dumps({"keys": clearKeys})) def verify_login(request, username, token): - for key in User_Keys.objects.filter(username=username, key_type = "RECOVERY"): + for key in User_Keys.objects.filter(username=username, key_type="RECOVERY"): secret_keys = key.properties["secret_keys"] salt = key.properties["salt"] hashedToken = make_password(token, salt, "pbkdf2_sha256_custom") - for i,token in enumerate(secret_keys): + for i, token in enumerate(secret_keys): if hashedToken == token: secret_keys.pop(i) key.properties["secret_keys"] = secret_keys - key.last_used= timezone.now() + key.last_used = timezone.now() key.save() return [True, key.id, len(secret_keys) == 0] return [False] + def getTokenLeft(request): - uk = User_Keys.objects.filter(username=request.user.username, key_type = "RECOVERY") - keyLeft=0 + uk = User_Keys.objects.filter(username=request.user.username, key_type="RECOVERY") + keyLeft = 0 for key in uk: keyLeft += len(key.properties["secret_keys"]) - return HttpResponse(simplejson.dumps({"left":keyLeft})) + return HttpResponse(simplejson.dumps({"left": keyLeft})) + def recheck(request): context = csrf(request) - context["mode"]="recheck" + context["mode"] = "recheck" if request.method == "POST": - if verify_login(request,request.user.username, token=request.POST["recovery"])[0]: - import time + if verify_login(request, request.user.username, token=request.POST["recovery"])[ + 0 + ]: request.session["mfa"]["rechecked_at"] = time.time() - return HttpResponse(simplejson.dumps({"recheck": True}), content_type="application/json") + return HttpResponse( + simplejson.dumps({"recheck": True}), content_type="application/json" + ) else: - return HttpResponse(simplejson.dumps({"recheck": False}), content_type="application/json") - return render(request,"RECOVERY/recheck.html", context) + return HttpResponse( + simplejson.dumps({"recheck": False}), content_type="application/json" + ) + return render(request, "RECOVERY/recheck.html", context) + @never_cache def auth(request): from .views import login - context=csrf(request) - if request.method=="POST": + + context = csrf(request) + if request.method == "POST": tokenLength = len(request.POST["recovery"]) if tokenLength == 11 and "RECOVERY" not in settings.MFA_UNALLOWED_METHODS: - #Backup code check - resBackup=verify_login(request, request.session["base_username"], token=request.POST["recovery"]) + # Backup code check + resBackup = verify_login( + request, + request.session["base_username"], + token=request.POST["recovery"], + ) if resBackup[0]: - mfa = {"verified": True, "method": "RECOVERY","id":resBackup[1], "lastBackup":resBackup[2]} + mfa = { + "verified": True, + "method": "RECOVERY", + "id": resBackup[1], + "lastBackup": resBackup[2], + } # if getattr(settings, "MFA_RECHECK", False): # mfa["next_check"] = datetime.datetime.timestamp((datetime.datetime.now() # + datetime.timedelta( # seconds=random.randint(settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX)))) request.session["mfa"] = mfa if resBackup[2]: - #If the last bakup code has just been used, we return a response insead of redirecting to login + # If the last bakup code has just been used, we return a response insead of redirecting to login context["lastBackup"] = True - return render(request,"RECOVERY/Auth.html", context) + return render(request, "RECOVERY/Auth.html", context) return login(request) - context["invalid"]=True + context["invalid"] = True - elif request.method=="GET": + elif request.method == "GET": mfa = request.session.get("mfa") if mfa and mfa["verified"] and mfa["lastBackup"]: return login(request) - return render(request,"RECOVERY/Auth.html", context) + return render(request, "RECOVERY/Auth.html", context) + @never_cache def start(request): @@ -119,4 +149,4 @@ def start(request): context = get_redirect_url() if "mfa_reg" in request.session: context["mfa_redirect"] = request.session["mfa_reg"]["name"] - return render(request,"RECOVERY/Add.html",context) \ No newline at end of file + return render(request, "RECOVERY/Add.html", context) diff --git a/mfa/totp.py b/mfa/totp.py index 4cf099c..0429ced 100644 --- a/mfa/totp.py +++ b/mfa/totp.py @@ -1,90 +1,125 @@ +import random +import datetime +import simplejson +import pyotp from django.shortcuts import render from django.views.decorators.cache import never_cache from django.http import HttpResponse -from .Common import get_redirect_url -from .models import * from django.template.context_processors import csrf -import simplejson from django.conf import settings -import pyotp -from .views import login -import datetime from django.utils import timezone -import random +from .views import login +from .Common import get_redirect_url +from .models import User_Keys -def verify_login(request,username,token): - for key in User_Keys.objects.filter(username=username,key_type = "TOTP"): +def verify_login(request, username, token): + for key in User_Keys.objects.filter(username=username, key_type="TOTP"): totp = pyotp.TOTP(key.properties["secret_key"]) - if totp.verify(token,valid_window = 30): - key.last_used=timezone.now() + if totp.verify(token, valid_window=30): + key.last_used = timezone.now() key.save() - return [True,key.id] + return [True, key.id] return [False] + def recheck(request): context = csrf(request) - context["mode"]="recheck" + context["mode"] = "recheck" if request.method == "POST": - if verify_login(request,request.user.username, token=request.POST["otp"])[0]: - import time + if verify_login(request, request.user.username, token=request.POST["otp"])[0]: request.session["mfa"]["rechecked_at"] = time.time() - return HttpResponse(simplejson.dumps({"recheck": True}), content_type="application/json") + return HttpResponse( + simplejson.dumps({"recheck": True}), content_type="application/json" + ) else: - return HttpResponse(simplejson.dumps({"recheck": False}), content_type="application/json") - return render(request,"TOTP/recheck.html", context) + return HttpResponse( + simplejson.dumps({"recheck": False}), content_type="application/json" + ) + return render(request, "TOTP/recheck.html", context) + @never_cache def auth(request): - context=csrf(request) - if request.method=="POST": + context = csrf(request) + if request.method == "POST": tokenLength = len(request.POST["otp"]) if tokenLength == 6: - #TOTO code check - res=verify_login(request,request.session["base_username"],token = request.POST["otp"]) + # TOTO code check + res = verify_login( + request, request.session["base_username"], token=request.POST["otp"] + ) if res[0]: - mfa = {"verified": True, "method": "TOTP","id":res[1]} + mfa = {"verified": True, "method": "TOTP", "id": res[1]} if getattr(settings, "MFA_RECHECK", False): - mfa["next_check"] = datetime.datetime.timestamp((datetime.datetime.now() - + datetime.timedelta( - seconds=random.randint(settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX)))) + mfa["next_check"] = datetime.datetime.timestamp( + ( + datetime.datetime.now() + + datetime.timedelta( + seconds=random.randint( + settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX + ) + ) + ) + ) request.session["mfa"] = mfa return login(request) - context["invalid"]=True - return render(request,"TOTP/Auth.html", context) - + context["invalid"] = True + return render(request, "TOTP/Auth.html", context) def getToken(request): - secret_key=pyotp.random_base32() + secret_key = pyotp.random_base32() totp = pyotp.TOTP(secret_key) - request.session["new_mfa_answer"]=totp.now() - return HttpResponse(simplejson.dumps({"qr":pyotp.totp.TOTP(secret_key).provisioning_uri(str(request.user.username), issuer_name = settings.TOKEN_ISSUER_NAME), - "secret_key": secret_key})) + request.session["new_mfa_answer"] = totp.now() + return HttpResponse( + simplejson.dumps( + { + "qr": pyotp.totp.TOTP(secret_key).provisioning_uri( + str(request.user.username), issuer_name=settings.TOKEN_ISSUER_NAME + ), + "secret_key": secret_key, + } + ) + ) + + def verify(request): - answer=request.GET["answer"] - secret_key=request.GET["key"] + answer = request.GET["answer"] + secret_key = request.GET["key"] totp = pyotp.TOTP(secret_key) - if totp.verify(answer,valid_window = 60): - uk=User_Keys() - uk.username=request.user.username - uk.properties={"secret_key":secret_key} - #uk.name="Authenticatior #%s"%User_Keys.objects.filter(username=user.username,type="TOTP") - uk.key_type="TOTP" + if totp.verify(answer, valid_window=60): + uk = User_Keys() + uk.username = request.user.username + uk.properties = {"secret_key": secret_key} + # uk.name="Authenticatior #%s"%User_Keys.objects.filter(username=user.username,type="TOTP") + uk.key_type = "TOTP" uk.save() - if getattr(settings, 'MFA_ENFORCE_RECOVERY_METHOD', False) and not User_Keys.objects.filter(key_type="RECOVERY", - username=request.user.username).exists(): - request.session["mfa_reg"] = {"method": "TOTP", - "name": getattr(settings, "MFA_RENAME_METHODS", {}).get("TOTP", "TOTP")} + if ( + getattr(settings, "MFA_ENFORCE_RECOVERY_METHOD", False) + and not User_Keys.objects.filter( + key_type="RECOVERY", username=request.user.username + ).exists() + ): + request.session["mfa_reg"] = { + "method": "TOTP", + "name": getattr(settings, "MFA_RENAME_METHODS", {}).get("TOTP", "TOTP"), + } return HttpResponse("RECOVERY") else: return HttpResponse("Success") - else: return HttpResponse("Error") + else: + return HttpResponse("Error") + @never_cache def start(request): """Start Adding Time One Time Password (TOTP)""" context = get_redirect_url() - context["RECOVERY_METHOD"] = getattr(settings, "MFA_RENAME_METHODS", {}).get("RECOVERY", "Recovery codes") - context["method"] = {"name":getattr(settings,"MFA_RENAME_METHODS",{}).get("TOTP","Authenticator")} - return render(request,"TOTP/Add.html",context) + context["RECOVERY_METHOD"] = getattr(settings, "MFA_RENAME_METHODS", {}).get( + "RECOVERY", "Recovery codes" + ) + context["method"] = { + "name": getattr(settings, "MFA_RENAME_METHODS", {}).get("TOTP", "Authenticator") + } + return render(request, "TOTP/Add.html", context) diff --git a/mfa/urls.py b/mfa/urls.py index 6d5e7dc..d0d738c 100644 --- a/mfa/urls.py +++ b/mfa/urls.py @@ -1,56 +1,55 @@ -from . import views,totp,U2F,TrustedDevice,helpers,FIDO2,Email,recovery -#app_name='mfa' +from . import views, totp, U2F, TrustedDevice, helpers, FIDO2, Email, recovery -try: - from django.urls import re_path as url -except: - from django.conf.urls import url -urlpatterns = [ - url(r'totp/start/', totp.start , name="start_new_otop"), - url(r'totp/getToken', totp.getToken , name="get_new_otop"), - url(r'totp/verify', totp.verify, name="verify_otop"), - url(r'totp/auth', totp.auth, name="totp_auth"), - url(r'totp/recheck', totp.recheck, name="totp_recheck"), - - url(r'recovery/start', recovery.start, name="manage_recovery_codes"), - url(r'recovery/getTokenLeft', recovery.getTokenLeft, name="get_recovery_token_left"), - url(r'recovery/genTokens', recovery.genTokens, name="regen_recovery_tokens"), - url(r'recovery/auth', recovery.auth, name="recovery_auth"), - url(r'recovery/recheck', recovery.recheck, name="recovery_recheck"), - - url(r'email/start/', Email.start , name="start_email"), - url(r'email/auth/', Email.auth , name="email_auth"), - - url(r'u2f/$', U2F.start, name="start_u2f"), - url(r'u2f/bind', U2F.bind, name="bind_u2f"), - url(r'u2f/auth', U2F.auth, name="u2f_auth"), - url(r'u2f/process_recheck', U2F.process_recheck, name="u2f_recheck"), - url(r'u2f/verify', U2F.verify, name="u2f_verify"), +# app_name='mfa' - url(r'fido2/$', FIDO2.start, name="start_fido2"), - url(r'fido2/auth', FIDO2.auth, name="fido2_auth"), - url(r'fido2/begin_auth', FIDO2.authenticate_begin, name="fido2_begin_auth"), - url(r'fido2/complete_auth', FIDO2.authenticate_complete, name="fido2_complete_auth"), - url(r'fido2/begin_reg', FIDO2.begin_registeration, name="fido2_begin_reg"), - url(r'fido2/complete_reg', FIDO2.complete_reg, name="fido2_complete_reg"), - url(r'fido2/recheck', FIDO2.recheck, name="fido2_recheck"), - - - url(r'td/$', TrustedDevice.start, name="start_td"), - url(r'td/add', TrustedDevice.add, name="add_td"), - url(r'td/send_link', TrustedDevice.send_email, name="td_sendemail"), - url(r'td/get-ua', TrustedDevice.getUserAgent, name="td_get_useragent"), - url(r'td/trust', TrustedDevice.trust_device, name="td_trust_device"), - url(r'u2f/checkTrusted', TrustedDevice.checkTrusted, name="td_checkTrusted"), - url(r'u2f/secure_device', TrustedDevice.getCookie, name="td_securedevice"), +try: + from django.urls import re_path as url +except ImportError: + from django.conf.urls import url # pyre-ignore[21] - url(r'^$', views.index, name="mfa_home"), - url(r'goto/(.*)', views.goto, name="mfa_goto"), - url(r'selct_method', views.show_methods, name="mfa_methods_list"), - url(r'recheck', helpers.recheck, name="mfa_recheck"), - url(r'toggleKey', views.toggleKey, name="toggle_key"), - url(r'delete', views.delKey, name="mfa_delKey"), - url(r'reset', views.reset_cookie, name="mfa_reset_cookie"), - ] -# print(urlpatterns) \ No newline at end of file +urlpatterns = [ + url(r"totp/start/", totp.start, name="start_new_otop"), + url(r"totp/getToken", totp.getToken, name="get_new_otop"), + url(r"totp/verify", totp.verify, name="verify_otop"), + url(r"totp/auth", totp.auth, name="totp_auth"), + url(r"totp/recheck", totp.recheck, name="totp_recheck"), + url(r"recovery/start", recovery.start, name="manage_recovery_codes"), + url( + r"recovery/getTokenLeft", recovery.getTokenLeft, name="get_recovery_token_left" + ), + url(r"recovery/genTokens", recovery.genTokens, name="regen_recovery_tokens"), + url(r"recovery/auth", recovery.auth, name="recovery_auth"), + url(r"recovery/recheck", recovery.recheck, name="recovery_recheck"), + url(r"email/start/", Email.start, name="start_email"), + url(r"email/auth/", Email.auth, name="email_auth"), + url(r"u2f/$", U2F.start, name="start_u2f"), + url(r"u2f/bind", U2F.bind, name="bind_u2f"), + url(r"u2f/auth", U2F.auth, name="u2f_auth"), + url(r"u2f/process_recheck", U2F.process_recheck, name="u2f_recheck"), + url(r"u2f/verify", U2F.verify, name="u2f_verify"), + url(r"fido2/$", FIDO2.start, name="start_fido2"), + url(r"fido2/auth", FIDO2.auth, name="fido2_auth"), + url(r"fido2/begin_auth", FIDO2.authenticate_begin, name="fido2_begin_auth"), + url( + r"fido2/complete_auth", FIDO2.authenticate_complete, name="fido2_complete_auth" + ), + url(r"fido2/begin_reg", FIDO2.begin_registeration, name="fido2_begin_reg"), + url(r"fido2/complete_reg", FIDO2.complete_reg, name="fido2_complete_reg"), + url(r"fido2/recheck", FIDO2.recheck, name="fido2_recheck"), + url(r"td/$", TrustedDevice.start, name="start_td"), + url(r"td/add", TrustedDevice.add, name="add_td"), + url(r"td/send_link", TrustedDevice.send_email, name="td_sendemail"), + url(r"td/get-ua", TrustedDevice.getUserAgent, name="td_get_useragent"), + url(r"td/trust", TrustedDevice.trust_device, name="td_trust_device"), + url(r"u2f/checkTrusted", TrustedDevice.checkTrusted, name="td_checkTrusted"), + url(r"u2f/secure_device", TrustedDevice.getCookie, name="td_securedevice"), + url(r"^$", views.index, name="mfa_home"), + url(r"goto/(.*)", views.goto, name="mfa_goto"), + url(r"selct_method", views.show_methods, name="mfa_methods_list"), + url(r"recheck", helpers.recheck, name="mfa_recheck"), + url(r"toggleKey", views.toggleKey, name="toggle_key"), + url(r"delete", views.delKey, name="mfa_delKey"), + url(r"reset", views.reset_cookie, name="mfa_reset_cookie"), +] +# print(urlpatterns) diff --git a/mfa/views.py b/mfa/views.py index 819dae5..7ef8b81 100644 --- a/mfa/views.py +++ b/mfa/views.py @@ -1,102 +1,116 @@ import importlib from django.shortcuts import render -from django.http import HttpResponse,HttpResponseRedirect -from .models import * +from django.http import HttpResponse, HttpResponseRedirect + + try: from django.urls import reverse except: - from django.core.urlresolvers import reverse -from django.template.context_processors import csrf -from django.template.context import RequestContext -from django.conf import settings -from . import TrustedDevice + from django.core.urlresolvers import reverse # pyre-ignore[21] from django.contrib.auth.decorators import login_required +from django.conf import settings from user_agents import parse +from . import TrustedDevice +from .models import User_Keys + @login_required def index(request): - keys=[] - context={"keys":User_Keys.objects.filter(username=request.user.username),"UNALLOWED_AUTHEN_METHODS":settings.MFA_UNALLOWED_METHODS - ,"HIDE_DISABLE":getattr(settings,"MFA_HIDE_DISABLE",[]),'RENAME_METHODS':getattr(settings,'MFA_RENAME_METHODS',{})} + keys = [] + context = { + "keys": User_Keys.objects.filter(username=request.user.username), + "UNALLOWED_AUTHEN_METHODS": settings.MFA_UNALLOWED_METHODS, + "HIDE_DISABLE": getattr(settings, "MFA_HIDE_DISABLE", []), + "RENAME_METHODS": getattr(settings, "MFA_RENAME_METHODS", {}), + } for k in context["keys"]: - k.name = getattr(settings,'MFA_RENAME_METHODS',{}).get(k.key_type,k.key_type) - if k.key_type =="Trusted Device": - setattr(k,"device",parse(k.properties.get("user_agent","-----"))) + k.name = getattr(settings, "MFA_RENAME_METHODS", {}).get(k.key_type, k.key_type) + if k.key_type == "Trusted Device": + setattr(k, "device", parse(k.properties.get("user_agent", "-----"))) elif k.key_type == "FIDO2": - setattr(k,"device",k.properties.get("type","----")) + setattr(k, "device", k.properties.get("type", "----")) elif k.key_type == "RECOVERY": context["recovery"] = k continue keys.append(k) - context["keys"]=keys - return render(request,"MFA.html",context) + context["keys"] = keys + return render(request, "MFA.html", context) -def verify(request,username): + +def verify(request, username): request.session["base_username"] = username - #request.session["base_password"] = password - keys=User_Keys.objects.filter(username=username,enabled=1) - methods=list(set([k.key_type for k in keys])) + # request.session["base_password"] = password + keys = User_Keys.objects.filter(username=username, enabled=1) + methods = list(set([k.key_type for k in keys])) - if "Trusted Device" in methods and not request.session.get("checked_trusted_device",False): + if "Trusted Device" in methods and not request.session.get( + "checked_trusted_device", False + ): if TrustedDevice.verify(request): return login(request) methods.remove("Trusted Device") request.session["mfa_methods"] = methods - if len(methods)==1: - return HttpResponseRedirect(reverse(methods[0].lower()+"_auth")) - if getattr(settings,"MFA_ALWAYS_GO_TO_LAST_METHOD",False): + if len(methods) == 1: + return HttpResponseRedirect(reverse(methods[0].lower() + "_auth")) + if getattr(settings, "MFA_ALWAYS_GO_TO_LAST_METHOD", False): keys = keys.exclude(last_used__isnull=True).order_by("last_used") - if keys.count()>0: + if keys.count() > 0: return HttpResponseRedirect(reverse(keys[0].key_type.lower() + "_auth")) return show_methods(request) + def show_methods(request): - return render(request,"select_mfa_method.html", {'RENAME_METHODS':getattr(settings,'MFA_RENAME_METHODS',{})}) + return render( + request, + "select_mfa_method.html", + {"RENAME_METHODS": getattr(settings, "MFA_RENAME_METHODS", {})}, + ) + def reset_cookie(request): - response=HttpResponseRedirect(settings.LOGIN_URL) + response = HttpResponseRedirect(settings.LOGIN_URL) response.delete_cookie("base_username") return response + def login(request): - from django.contrib import auth - from django.conf import settings callable_func = __get_callable_function__(settings.MFA_LOGIN_CALLBACK) - return callable_func(request,username=request.session["base_username"]) + return callable_func(request, username=request.session["base_username"]) @login_required def delKey(request): - key=User_Keys.objects.get(id=request.GET["id"]) + key = User_Keys.objects.get(id=request.GET["id"]) if key.username == request.user.username: key.delete() return HttpResponse("Deleted Successfully") else: return HttpResponse("Error: You own this token so you can't delete it") + def __get_callable_function__(func_path): - import importlib - if not '.' in func_path: + if not "." in func_path: raise Exception("class Name should include modulename.classname") parsed_str = func_path.split(".") - module_name , func_name = ".".join(parsed_str[:-1]) , parsed_str[-1] + module_name, func_name = ".".join(parsed_str[:-1]), parsed_str[-1] imported_module = importlib.import_module(module_name) - callable_func = getattr(imported_module,func_name) + callable_func = getattr(imported_module, func_name) if not callable_func: raise Exception("Module does not have requested function") return callable_func + @login_required def toggleKey(request): - id=request.GET["id"] - q=User_Keys.objects.filter(username=request.user.username, id=id) - if q.count()==1: - key=q[0] + id = request.GET["id"] + q = User_Keys.objects.filter(username=request.user.username, id=id) + if q.count() == 1: + key = q[0] if not key.key_type in settings.MFA_HIDE_DISABLE: - key.enabled=not key.enabled + key.enabled = not key.enabled key.save() return HttpResponse("OK") else: @@ -104,5 +118,6 @@ def toggleKey(request): else: return HttpResponse("Error") -def goto(request,method): - return HttpResponseRedirect(reverse(method.lower()+"_auth")) + +def goto(request, method): + return HttpResponseRedirect(reverse(method.lower() + "_auth")) diff --git a/requirements.txt b/requirements.txt index ba3a23f..8c07f3b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,11 @@ django >= 2.2 -jsonfield simplejson pyotp python-u2flib-server ua-parser user-agents python-jose -fido2 == 1.0.0 +fido2 > 1.1.0,<2.0 jsonLookup +raven +pyre-check diff --git a/setup.py b/setup.py index 19822e9..5849c5b 100644 --- a/setup.py +++ b/setup.py @@ -3,34 +3,33 @@ from setuptools import find_packages, setup setup( - name='django-mfa2', - version='2.8.0', - description='Allows user to add 2FA to their accounts', + name="django-mfa2", + version="2.9.0", + description="Allows user to add 2FA to their accounts", long_description=open("README.md").read(), long_description_content_type="text/markdown", - - author='Mohamed El-Kalioby', - author_email = 'mkalioby@mkalioby.com', - url = 'https://github.com/mkalioby/django-mfa2/', - download_url='https://github.com/mkalioby/django-mfa2/', - license='MIT', + author="Mohamed El-Kalioby", + author_email="mkalioby@mkalioby.com", + url="https://github.com/mkalioby/django-mfa2/", + download_url="https://github.com/mkalioby/django-mfa2/", + license="MIT", packages=find_packages(), install_requires=[ - 'django >= 2.0', - 'simplejson', - 'pyotp', - 'python-u2flib-server', - 'ua-parser', - 'user-agents', - 'python-jose', - 'fido2 == 1.0.0', - ], + "django >= 2.0", + "simplejson", + "pyotp", + "python-u2flib-server", + "ua-parser", + "user-agents", + "python-jose", + "fido2 >= 1.1.0", + ], python_requires=">=3.5", include_package_data=True, - zip_safe=False, # because we're including static files + zip_safe=False, # because we're including static files classifiers=[ "Development Status :: 5 - Production/Stable", - #"Development Status :: 4 - Beta", + # "Development Status :: 4 - Beta", "Environment :: Web Environment", "Framework :: Django", "Framework :: Django :: 2.0", @@ -53,5 +52,5 @@ "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Topic :: Software Development :: Libraries :: Python Modules", -] + ], )