forked from keitaroinc/ckanext-saml2auth
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request keitaroinc#43 from okfn/isaml2auht-interface
ISaml2Auth interface
- Loading branch information
Showing
7 changed files
with
377 additions
and
113 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
from ckan.plugins.interfaces import Interface | ||
|
||
|
||
class ISaml2Auth(Interface): | ||
u''' | ||
This interface allows plugins to hook into the Saml2 authorization flow | ||
''' | ||
def before_saml2_user_update(self, user_dict, saml_attributes): | ||
u''' | ||
Called just before updating an existing user | ||
:param user_dict: User metadata dict that will be passed to user_update | ||
:param saml_attributes: A dict containing extra SAML attributes returned | ||
as part of the SAML Response | ||
''' | ||
pass | ||
|
||
def before_saml2_user_create(self, user_dict, saml_attributes): | ||
u''' | ||
Called just before creating a new user | ||
:param user_dict: User metadata dict that will be passed to user_create | ||
:param saml_attributes: A dict containing extra SAML attributes returned | ||
as part of the SAML Response | ||
''' | ||
pass | ||
|
||
def after_saml2_login(self, resp, saml_attributes): | ||
u''' | ||
Called once the user has been logged in programatically, just before | ||
returning the request. The logged in user can be accessed using g.user | ||
or g.userobj | ||
It should always return the provided response object (which can be of course | ||
modified) | ||
:param resp: A Flask response object. Can be used to issue | ||
redirects, add headers, etc | ||
:param saml_attributes: A dict containing extra SAML attributes returned | ||
as part of the SAML Response | ||
''' | ||
return resp |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
import os | ||
from collections import defaultdict | ||
|
||
import pytest | ||
|
||
import ckan.model as model | ||
import ckan.plugins as plugins | ||
from ckan.tests import factories | ||
|
||
from ckanext.saml2auth.interfaces import ISaml2Auth | ||
from ckanext.saml2auth.tests.test_blueprint_get_request import ( | ||
_prepare_unsigned_response | ||
) | ||
|
||
here = os.path.dirname(os.path.abspath(__file__)) | ||
extras_folder = os.path.join(here, 'extras') | ||
|
||
|
||
class ExampleISaml2AuthPlugin(plugins.SingletonPlugin): | ||
|
||
plugins.implements(ISaml2Auth, inherit=True) | ||
|
||
def __init__(self, *args, **kwargs): | ||
|
||
self.calls = defaultdict(int) | ||
|
||
def before_saml2_user_update(self, user_dict, saml_attributes): | ||
|
||
self.calls['before_saml2_user_update'] += 1 | ||
|
||
user_dict['fullname'] += ' TEST UPDATE' | ||
|
||
user_dict['plugin_extras']['my_plugin'] = {} | ||
user_dict['plugin_extras']['my_plugin']['key1'] = 'value1' | ||
|
||
def before_saml2_user_create(self, user_dict, saml_attributes): | ||
|
||
self.calls['before_saml2_user_create'] += 1 | ||
|
||
user_dict['fullname'] += ' TEST CREATE' | ||
|
||
user_dict['plugin_extras']['my_plugin'] = {} | ||
user_dict['plugin_extras']['my_plugin']['key2'] = 'value2' | ||
|
||
def after_saml2_login(self, resp, saml_attributes): | ||
|
||
self.calls['after_saml2_login'] += 1 | ||
|
||
resp.headers['X-Custom-header'] = 'test' | ||
|
||
return resp | ||
|
||
|
||
@pytest.mark.usefixtures(u'clean_db', u'with_plugins') | ||
@pytest.mark.ckan_config(u'ckan.plugins', u'saml2auth') | ||
@pytest.mark.ckan_config(u'ckanext.saml2auth.entity_id', u'urn:gov:gsa:SAML:2.0.profiles:sp:sso:test:entity') | ||
@pytest.mark.ckan_config(u'ckanext.saml2auth.idp_metadata.location', u'local') | ||
@pytest.mark.ckan_config(u'ckanext.saml2auth.idp_metadata.local_path', os.path.join(extras_folder, 'provider0', 'idp.xml')) | ||
@pytest.mark.ckan_config(u'ckanext.saml2auth.want_response_signed', u'False') | ||
@pytest.mark.ckan_config(u'ckanext.saml2auth.want_assertions_signed', u'False') | ||
@pytest.mark.ckan_config(u'ckanext.saml2auth.want_assertions_or_response_signed', u'False') | ||
class TestInterface(object): | ||
|
||
def test_after_login_is_called(self, app): | ||
|
||
encoded_response = _prepare_unsigned_response() | ||
url = '/acs' | ||
|
||
data = { | ||
'SAMLResponse': encoded_response | ||
} | ||
|
||
with plugins.use_plugin("test_saml2auth") as plugin: | ||
response = app.post(url=url, params=data, follow_redirects=False) | ||
assert 302 == response.status_code | ||
|
||
assert plugin.calls["after_saml2_login"] == 1, plugin.calls | ||
|
||
assert response.headers['X-Custom-header'] == 'test' | ||
|
||
def test_before_create_is_called(self, app): | ||
|
||
encoded_response = _prepare_unsigned_response() | ||
url = '/acs' | ||
|
||
data = { | ||
'SAMLResponse': encoded_response | ||
} | ||
|
||
with plugins.use_plugin("test_saml2auth") as plugin: | ||
response = app.post(url=url, params=data, follow_redirects=False) | ||
assert 302 == response.status_code | ||
|
||
assert plugin.calls["before_saml2_user_create"] == 1, plugin.calls | ||
|
||
user = model.User.by_email('[email protected]')[0] | ||
|
||
assert user.fullname.endswith('TEST CREATE') | ||
|
||
assert user.plugin_extras['my_plugin']['key2'] == 'value2' | ||
|
||
assert 'saml_id' in user.plugin_extras['saml2auth'] | ||
|
||
def test_before_update_is_called_on_saml_user(self, app): | ||
|
||
# From unsigned0.xml | ||
saml_id = '_ce3d2948b4cf20146dee0a0b3dd6f69b6cf86f62d7' | ||
|
||
user = factories.User( | ||
email='[email protected]', | ||
plugin_extras={ | ||
'saml2auth': { | ||
'saml_id': saml_id, | ||
} | ||
} | ||
) | ||
|
||
encoded_response = _prepare_unsigned_response() | ||
url = '/acs' | ||
|
||
data = { | ||
'SAMLResponse': encoded_response | ||
} | ||
|
||
with plugins.use_plugin("test_saml2auth") as plugin: | ||
response = app.post(url=url, params=data, follow_redirects=False) | ||
assert 302 == response.status_code | ||
|
||
assert plugin.calls["before_saml2_user_update"] == 1, plugin.calls | ||
|
||
user = model.User.by_email('[email protected]')[0] | ||
|
||
assert user.fullname.endswith('TEST UPDATE') | ||
|
||
assert user.plugin_extras['my_plugin']['key1'] == 'value1' | ||
|
||
assert user.plugin_extras['saml2auth']['saml_id'] == saml_id | ||
|
||
def test_before_update_is_called_on_ckan_user(self, app): | ||
|
||
user = factories.User( | ||
email='[email protected]', | ||
) | ||
|
||
encoded_response = _prepare_unsigned_response() | ||
url = '/acs' | ||
|
||
data = { | ||
'SAMLResponse': encoded_response | ||
} | ||
|
||
with plugins.use_plugin("test_saml2auth") as plugin: | ||
response = app.post(url=url, params=data, follow_redirects=False) | ||
assert 302 == response.status_code | ||
|
||
assert plugin.calls["before_saml2_user_update"] == 1, plugin.calls | ||
|
||
user = model.User.by_email('[email protected]')[0] | ||
|
||
assert user.fullname.endswith('TEST UPDATE') | ||
|
||
assert 'saml_id' in user.plugin_extras['saml2auth'] |
Oops, something went wrong.