From 0e75eb74cc30b0b9960248faf0b120e1ecbd7058 Mon Sep 17 00:00:00 2001 From: Neyts Zupan Date: Fri, 8 Nov 2024 18:55:08 +0000 Subject: [PATCH] Add bypass of herokuapp access restriction (#85) Refs https://github.com/teamniteo/operations/issues/2359 --- README.rst | 5 +- pyramid_heroku/herokuapp_access.py | 25 +++++++++- pyramid_heroku/tests/test_herokuapp_access.py | 46 ++++++++++++++++++- 3 files changed, 73 insertions(+), 3 deletions(-) diff --git a/README.rst b/README.rst index 0668dcd..938e1aa 100644 --- a/README.rst +++ b/README.rst @@ -47,7 +47,10 @@ Usage example for tweens:: The ``pyramid_heroku.herokuapp_access`` tween depends on ``pyramid_heroku.client_addr`` tween and it requires you to list allowlisted IPs -in the ``pyramid_heroku.herokuapp_allowlist`` setting. +in the ``pyramid_heroku.herokuapp_allowlist`` setting. A bypass is possible +by setting the `HEROKUAPP_ACCESS_BYPASS` environment variable to a secret value +and then sending a request with the `HEROKUAPP_ACCESS_BYPASS` header set to the +same secret value. The ``pyramid_heroku.client_addr`` tween sets request.client_addr to an IP we can trust. It handles IP spoofing via ``X-Forwarded-For`` headers and diff --git a/pyramid_heroku/herokuapp_access.py b/pyramid_heroku/herokuapp_access.py index 980e9d0..fd61d30 100644 --- a/pyramid_heroku/herokuapp_access.py +++ b/pyramid_heroku/herokuapp_access.py @@ -3,10 +3,14 @@ from pyramid.response import Response import logging +import os def includeme(config): - config.add_tween("pyramid_heroku.herokuapp_access.HerokuappAccess") + config.add_tween( + "pyramid_heroku.herokuapp_access.HerokuappAccess", + under="pyramid_heroku.client_addr.ClientAddr", + ) class HerokuappAccess(object): @@ -20,6 +24,8 @@ class HerokuappAccess(object): tween. """ + import os + def __init__(self, handler, registry): self.handler = handler self.registry = registry @@ -30,6 +36,23 @@ def __call__(self, request): "pyramid_heroku.herokuapp_allowlist", "" ).split("\n") + if os.environ.get("HEROKUAPP_ACCESS_BYPASS"): + if request.headers.get("HEROKUAPP_ACCESS_BYPASS") == os.environ.get( + "HEROKUAPP_ACCESS_BYPASS" + ): + if request.registry.settings.get("pyramid_heroku.structlog"): + import structlog + + logger = structlog.getLogger(__name__) + logger.info( + "Herokuapp access bypassed", user_ip=request.client_addr + ) + else: + logger = logging.getLogger(__name__) + logger.info(f"Herokuapp access bypassed by {request.client_addr}") + + return self.handler(request) + if ( "herokuapp.com" in request.headers["Host"] and request.client_addr not in allowlisted_ips diff --git a/pyramid_heroku/tests/test_herokuapp_access.py b/pyramid_heroku/tests/test_herokuapp_access.py index e94a59b..62033d3 100644 --- a/pyramid_heroku/tests/test_herokuapp_access.py +++ b/pyramid_heroku/tests/test_herokuapp_access.py @@ -5,6 +5,7 @@ import logging import mock +import os import structlog import unittest @@ -66,7 +67,7 @@ def test_non_allowlisted_ip(self): assert not self.handler.called, "handler should not be called" self.assertEqual(len(tweens_handler.records), 1) self.assertEqual( - "Denied Herokuapp access for Host foo.herokuapp.com and IP 6.6.6.6", # noqa + "Denied Herokuapp access for Host foo.herokuapp.com and IP 6.6.6.6", tweens_handler.records[0].msg, ) self.assertEqual(response.status_code, 403) @@ -101,3 +102,46 @@ def test_herokuapp_allowlist_empty(self): HerokuappAccess(self.handler, self.request.registry)(self.request) assert not self.handler.called, "handler should not be called" + + @mock.patch.dict(os.environ, {"HEROKUAPP_ACCESS_BYPASS": "foo"}) + def test_herokuapp_access_bypass(self): + "The IP check can be bypassed by setting a correct header." + from pyramid_heroku.herokuapp_access import HerokuappAccess + + self.request.client_addr = "6.6.6.6" + self.request.headers = { + "Host": "foo.herokuapp.com", + "HEROKUAPP_ACCESS_BYPASS": "foo", + } + + # structlog version + HerokuappAccess(self.handler, self.request.registry)(self.request) + self.handler.assert_called_with(self.request) + self.assertEqual(len(tweens_handler.records), 1) + self.assertEqual("Herokuapp access bypassed", tweens_handler.records[0].msg) + + # standard logging version + self.request.registry.settings["pyramid_heroku.structlog"] = False + tweens_handler.clear() + HerokuappAccess(self.handler, self.request.registry)(self.request) + self.handler.assert_called_with(self.request) + self.assertEqual(len(tweens_handler.records), 1) + self.assertEqual( + "Herokuapp access bypassed by 6.6.6.6", + tweens_handler.records[0].msg, + ) + + @mock.patch.dict(os.environ, {"HEROKUAPP_ACCESS_BYPASS": "foo"}) + def test_herokuapp_access_bypass_invalid(self): + "Invalid bypass code is rejected." + from pyramid_heroku.herokuapp_access import HerokuappAccess + + self.request.client_addr = "6.6.6.6" + self.request.headers = { + "Host": "foo.herokuapp.com", + "HEROKUAPP_ACCESS_BYPASS": "bar", + } + self.request.registry.settings = {} + + HerokuappAccess(self.handler, self.request.registry)(self.request) + assert not self.handler.called, "handler should not be called"