Skip to content

Commit

Permalink
fix(facebook): Inspect POST'ed access token
Browse files Browse the repository at this point in the history
  • Loading branch information
pennersr committed Oct 26, 2023
1 parent e8a1035 commit 2b55add
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 58 deletions.
12 changes: 12 additions & 0 deletions ChangeLog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ Backwards incompatible changes
provider. Both providers were targeting the same goal.


Security notice
---------------

- Facebook: Using the JS SDK flow, it was possible to post valid access tokens
originating from other apps. Facebook user IDs are scoped per app. By default
that user ID (not the email address) is used as key while
authenticating. Therefore, such access tokens can not be abused by
default. However, in case ``SOCIALACCOUNT_EMAIL_AUTHENTICATION`` was
explicitly enabled for the Facebook provider, these tokens could be used to
login.


0.57.0 (2023-09-24)
*******************

Expand Down
10 changes: 10 additions & 0 deletions allauth/account/tests/test_login.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ def test_login_unverified_account_optional(self):
@override_settings(
ACCOUNT_EMAIL_VERIFICATION=app_settings.EmailVerificationMethod.OPTIONAL,
ACCOUNT_LOGIN_ATTEMPTS_LIMIT=3,
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
},
)
def test_login_failed_attempts_exceeded(self):
user = get_user_model().objects.create(username="john")
Expand Down Expand Up @@ -146,6 +151,11 @@ def test_login_failed_attempts_exceeded(self):
ACCOUNT_AUTHENTICATION_METHOD=app_settings.AuthenticationMethod.EMAIL,
ACCOUNT_EMAIL_VERIFICATION=app_settings.EmailVerificationMethod.MANDATORY,
ACCOUNT_LOGIN_ATTEMPTS_LIMIT=1,
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
},
)
def test_login_failed_attempts_exceeded_cleared_on_password_reset(self):
# Ensure that login attempts, once they hit the limit,
Expand Down
10 changes: 10 additions & 0 deletions allauth/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,13 @@ def f():
@pytest.fixture(autouse=True)
def clear_context_request():
context._request_var.set(None)


@pytest.fixture
def enable_cache(settings):
settings.CACHES = {
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
yield
27 changes: 23 additions & 4 deletions allauth/socialaccount/providers/facebook/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,17 @@ def test_login_by_token(self):
with patch(
"allauth.socialaccount.providers.facebook.views.requests"
) as requests_mock:
mocks = [self.get_mocked_response().json()]
requests_mock.get.return_value.json = lambda: mocks.pop()
mocks = [
{"access_token": "app_token"},
{
"data": {
"app_id": "app123id",
"is_valid": True,
}
},
self.get_mocked_response().json(),
]
requests_mock.get.return_value.json = lambda: mocks.pop(0)
resp = self.client.post(
reverse("facebook_login_by_token"),
data={"access_token": "dummy"},
Expand All @@ -114,8 +123,18 @@ def test_login_by_token_reauthenticate(self):
with patch(
"allauth.socialaccount.providers.facebook.views.requests"
) as requests_mock:
mocks = [self.get_mocked_response().json(), {"auth_nonce": nonce}]
requests_mock.get.return_value.json = lambda: mocks.pop()
mocks = [
{"access_token": "app_token"},
{
"data": {
"app_id": "app123id",
"is_valid": True,
}
},
{"auth_nonce": nonce},
self.get_mocked_response().json(),
]
requests_mock.get.return_value.json = lambda: mocks.pop(0)
resp = self.client.post(
reverse("facebook_login_by_token"),
data={"access_token": "dummy"},
Expand Down
156 changes: 103 additions & 53 deletions allauth/socialaccount/providers/facebook/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
import requests
from datetime import timedelta

from django import forms
from django.core.cache import cache
from django.core.exceptions import PermissionDenied
from django.utils import timezone
from django.views.generic import View

from allauth.socialaccount import app_settings
from allauth.socialaccount.adapter import get_adapter
Expand Down Expand Up @@ -72,58 +76,104 @@ def complete_login(self, request, app, access_token, **kwargs):
oauth2_callback = OAuth2CallbackView.adapter_view(FacebookOAuth2Adapter)


def login_by_token(request):
ret = None
auth_exception = None
if request.method == "POST":
class LoginByTokenView(View):
def dispatch(self, request):
try:
return super().dispatch(request)
except (
requests.RequestException,
forms.ValidationError,
PermissionDenied,
) as exc:
return render_authentication_error(
request, FacebookProvider.id, exception=exc
)

def get(self, request):
# If we leave out get().get() it will return a response with a 405, but
# we really want to show an authentication error.
raise PermissionDenied("405")

def post(self, request):
form = FacebookConnectForm(request.POST)
if form.is_valid():
try:
adapter = get_adapter()
provider = adapter.get_provider(request, FacebookProvider.id)
login_options = provider.get_fb_login_options(request)
app = provider.app
access_token = form.cleaned_data["access_token"]
expires_at = None
if login_options.get("auth_type") == "reauthenticate":
info = requests.get(
GRAPH_API_URL + "/oauth/access_token_info",
params={
"client_id": app.client_id,
"access_token": access_token,
},
).json()
nonce = provider.get_nonce(request, pop=True)
ok = nonce and nonce == info.get("auth_nonce")
else:
ok = True
if ok and provider.get_settings().get("EXCHANGE_TOKEN"):
resp = requests.get(
GRAPH_API_URL + "/oauth/access_token",
params={
"grant_type": "fb_exchange_token",
"client_id": app.client_id,
"client_secret": app.secret,
"fb_exchange_token": access_token,
},
).json()
access_token = resp["access_token"]
expires_in = resp.get("expires_in")
if expires_in:
expires_at = timezone.now() + timedelta(seconds=int(expires_in))
if ok:
token = SocialToken(
app=app, token=access_token, expires_at=expires_at
)
login = fb_complete_login(request, app, token)
login.token = token
login.state = SocialLogin.state_from_request(request)
ret = complete_social_login(request, login)
except requests.RequestException as e:
logger.exception("Error accessing FB user profile")
auth_exception = e
if not ret:
ret = render_authentication_error(
request, FacebookProvider.id, exception=auth_exception
if not form.is_valid():
raise forms.ValidationError()

adapter = get_adapter()
provider = adapter.get_provider(request, FacebookProvider.id)
login_options = provider.get_fb_login_options(request)
app = provider.app
access_token = form.cleaned_data["access_token"]

self.inspect_token(provider, access_token)

expires_at = None
if login_options.get("auth_type") == "reauthenticate":
info = requests.get(
GRAPH_API_URL + "/oauth/access_token_info",
params={
"client_id": app.client_id,
"access_token": access_token,
},
).json()
nonce = provider.get_nonce(request, pop=True)
ok = nonce and nonce == info.get("auth_nonce")
else:
ok = True
if ok and provider.get_settings().get("EXCHANGE_TOKEN"):
resp = requests.get(
GRAPH_API_URL + "/oauth/access_token",
params={
"grant_type": "fb_exchange_token",
"client_id": app.client_id,
"client_secret": app.secret,
"fb_exchange_token": access_token,
},
).json()
access_token = resp["access_token"]
expires_in = resp.get("expires_in")
if expires_in:
expires_at = timezone.now() + timedelta(seconds=int(expires_in))
if ok:
token = SocialToken(app=app, token=access_token, expires_at=expires_at)
login = fb_complete_login(request, app, token)
login.token = token
login.state = SocialLogin.state_from_request(request)
ret = complete_social_login(request, login)
return ret

def get_app_token(self, provider):
app = provider.app
cache_key = f"allauth.facebook.app_token[{app.client_id}]"
app_token = cache.get(cache_key)
if not app_token:
resp = requests.get(
GRAPH_API_URL + "/oauth/access_token",
params={
"client_id": app.client_id,
"client_secret": app.secret,
"grant_type": "client_credentials",
},
)
resp.raise_for_status()
data = resp.json()
app_token = data["access_token"]
timeout = provider.get_settings().get("APP_TOKEN_CACHE_TIMEOUT", 300)
cache.set(cache_key, app_token, timeout=timeout)
return app_token

def inspect_token(self, provider, input_token):
app_token = self.get_app_token(provider)
resp = requests.get(
GRAPH_API_URL + "/debug_token",
params={"input_token": input_token, "access_token": app_token},
)
return ret
resp.raise_for_status()
data = resp.json()["data"]
if not data["is_valid"]:
raise PermissionDenied("token is not valid")
if data["app_id"] != provider.app.client_id or not data["is_valid"]:
raise PermissionDenied("token app_id mismatch")


login_by_token = LoginByTokenView.as_view()
7 changes: 6 additions & 1 deletion test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,19 @@
},
]

CACHES = {
"default": {
"BACKEND": "django.core.cache.backends.dummy.DummyCache",
}
}

MIDDLEWARE = (
"django.contrib.sessions.middleware.SessionMiddleware",
"django.middleware.common.CommonMiddleware",
"django.middleware.csrf.CsrfViewMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
"django.contrib.messages.middleware.MessageMiddleware",
"django.middleware.clickjacking.XFrameOptionsMiddleware",

"allauth.account.middleware.AccountMiddleware",
)

Expand Down

0 comments on commit 2b55add

Please sign in to comment.