Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for TLS/SSL mutual authentication #396

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,12 @@ Extended handlers
When True requires SSL/TLS to be established on the data channel. This
means the user will have to issue PROT before PASV or PORT (default
``False``).

.. data:: client_certfile

The path of the certificate to check the client certificate against. Must be provided
when tls_mutual_authentication is set to ``True`` (default ``None``).

Extended authorizers
--------------------

Expand Down
28 changes: 28 additions & 0 deletions pyftpdlib/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@

try:
from OpenSSL import SSL # requires "pip install pyopenssl"
from OpenSSL.SSL import OP_NO_TICKET
from OpenSSL.SSL import SESS_CACHE_OFF
from OpenSSL.SSL import VERIFY_CLIENT_ONCE
from OpenSSL.SSL import VERIFY_FAIL_IF_NO_PEER_CERT
from OpenSSL.SSL import VERIFY_PEER
except ImportError:
SSL = None

Expand Down Expand Up @@ -3414,6 +3419,8 @@ class TLS_FTPHandler(SSLConnection, FTPHandler):
certfile = None
keyfile = None
ssl_protocol = SSL.SSLv23_METHOD
# client certificate configurable attributes
client_certfile = None
# - SSLv2 is easily broken and is considered harmful and dangerous
# - SSLv3 has several problems and is now dangerous
# - Disable compression to prevent CRIME attacks for OpenSSL 1.0+
Expand Down Expand Up @@ -3446,10 +3453,23 @@ def __init__(self, conn, server, ioloop=None):
self._pbsz = False
self._prot = False
self.ssl_context = self.get_ssl_context()
if self.client_certfile is not None:
self.ssl_context.set_verify(VERIFY_PEER |
VERIFY_FAIL_IF_NO_PEER_CERT |
VERIFY_CLIENT_ONCE,
self.verify_certs_callback)

def __repr__(self):
return FTPHandler.__repr__(self)

# Cannot be @classmethod, need instance to log
def verify_certs_callback(self, connection, x509, errnum, errdepth, ok):
if not ok:
self.log("Bad client certificate detected.")
else:
self.log("Client certificate is valid.")
return ok
Copy link
Owner

@giampaolo giampaolo Aug 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is called when a client connects, as I suppose, then this should be a classmethod.


@classmethod
def get_ssl_context(cls):
if cls.ssl_context is None:
Expand All @@ -3464,6 +3484,14 @@ def get_ssl_context(cls):
if not cls.keyfile:
cls.keyfile = cls.certfile
cls.ssl_context.use_privatekey_file(cls.keyfile)
if cls.client_certfile is not None:
cls.ssl_context.set_verify(VERIFY_PEER |
VERIFY_FAIL_IF_NO_PEER_CERT |
VERIFY_CLIENT_ONCE,
cls.verify_certs_callback)
cls.ssl_context.load_verify_locations(cls.client_certfile)
cls.ssl_context.set_session_cache_mode(SESS_CACHE_OFF)
cls.ssl_options = cls.ssl_options | OP_NO_TICKET
if cls.ssl_options:
cls.ssl_context.set_options(cls.ssl_options)
return cls.ssl_context
Expand Down
2 changes: 1 addition & 1 deletion pyftpdlib/test/test_functional_ssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class TestCornerCasesTLSMixin(TLSTestMixin, TestCornerCases):

@unittest.skipUnless(FTPS_SUPPORT, FTPS_UNSUPPORT_REASON)
class TestFTPS(unittest.TestCase):
"""Specific tests fot TSL_FTPHandler class."""
"""Specific tests for TLS_FTPHandler class."""

def setUp(self):
self.server = FTPSServer()
Expand Down
150 changes: 150 additions & 0 deletions pyftpdlib/test/test_functional_ssl_client_certfile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#!/usr/bin/env python

# Copyright (C) 2007-2016 Giampaolo Rodola' <[email protected]>.
# Use of this source code is governed by MIT license that can be
# found in the LICENSE file.

import contextlib
import ftplib
import os
import socket
import sys
import ssl

import OpenSSL # requires "pip install pyopenssl"

from pyftpdlib.handlers import TLS_FTPHandler
from pyftpdlib.test import configure_logging
from pyftpdlib.test import PASSWD
from pyftpdlib.test import remove_test_files
from pyftpdlib.test import ThreadedTestFTPd
from pyftpdlib.test import TIMEOUT
from pyftpdlib.test import TRAVIS
from pyftpdlib.test import unittest
from pyftpdlib.test import USER
from pyftpdlib.test import VERBOSITY
from pyftpdlib.test.test_functional import TestCallbacks
from pyftpdlib.test.test_functional import TestConfigurableOptions
from pyftpdlib.test.test_functional import TestCornerCases
from pyftpdlib.test.test_functional import TestFtpAbort
from pyftpdlib.test.test_functional import TestFtpAuthentication
from pyftpdlib.test.test_functional import TestFtpCmdsSemantic
from pyftpdlib.test.test_functional import TestFtpDummyCmds
from pyftpdlib.test.test_functional import TestFtpFsOperations
from pyftpdlib.test.test_functional import TestFtpListingCmds
from pyftpdlib.test.test_functional import TestFtpRetrieveData
from pyftpdlib.test.test_functional import TestFtpStoreData
from pyftpdlib.test.test_functional import TestIPv4Environment
from pyftpdlib.test.test_functional import TestIPv6Environment
from pyftpdlib.test.test_functional import TestSendfile
from pyftpdlib.test.test_functional import TestTimeouts
from _ssl import SSLError


FTPS_SUPPORT = hasattr(ftplib, 'FTP_TLS')
if sys.version_info < (2, 7):
FTPS_UNSUPPORT_REASON = "requires python 2.7+"
else:
FTPS_UNSUPPORT_REASON = "FTPS test skipped"

CERTFILE = os.path.abspath(os.path.join(os.path.dirname(__file__),
'keycert.pem'))
CLIENT_CERTFILE = os.path.abspath(os.path.join(os.path.dirname(__file__),
'clientcert.pem'))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you forgot to add this clientcert.pem file to the PR?


del OpenSSL


if FTPS_SUPPORT:
class FTPSClient(ftplib.FTP_TLS):
"""A modified version of ftplib.FTP_TLS class which implicitly
secure the data connection after login().
"""

def login(self, *args, **kwargs):
ftplib.FTP_TLS.login(self, *args, **kwargs)
self.prot_p()

class FTPSServerAuth(ThreadedTestFTPd):
"""A threaded FTPS server that forces client certificate
authentication used for functional testing.
"""
handler = TLS_FTPHandler
handler.certfile = CERTFILE
handler.client_certfile = CLIENT_CERTFILE


# =====================================================================
# dedicated FTPS tests with client authentication
# =====================================================================


@unittest.skipUnless(FTPS_SUPPORT, FTPS_UNSUPPORT_REASON)
class TestFTPS(unittest.TestCase):
"""Specific tests for TLS_FTPHandler class."""

def setUp(self):
self.server = FTPSServerAuth()
self.server.start()

def tearDown(self):
self.client.close()
self.server.stop()

def assertRaisesWithMsg(self, excClass, msg, callableObj, *args, **kwargs):
try:
callableObj(*args, **kwargs)
except excClass as err:
if str(err) == msg:
return
raise self.failureException("%s != %s" % (str(err), msg))
else:
if hasattr(excClass, '__name__'):
excName = excClass.__name__
else:
excName = str(excClass)
raise self.failureException("%s not raised" % excName)

def test_auth_client_cert(self):
self.client = ftplib.FTP_TLS(timeout=TIMEOUT, certfile=CLIENT_CERTFILE)
self.client.connect(self.server.host, self.server.port)
# secured
try:
self.client.login()
except Exception as e:
self.fail("login with certificate should work")

def test_auth_client_nocert(self):
self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
self.client.connect(self.server.host, self.server.port)
try:
self.client.login()
except SSLError as e:
# client should not be able to log in
if "SSLV3_ALERT_HANDSHAKE_FAILURE" in e.reason:
pass
else:
self.fail("Incorrect SSL error with missing client certificate")
else:
self.fail("Client able to log in with no certificate")

def test_auth_client_badcert(self):
self.client = ftplib.FTP_TLS(timeout=TIMEOUT, certfile=CERTFILE)
self.client.connect(self.server.host, self.server.port)
try:
self.client.login()
except Exception as e:
# client should not be able to log in
if "TLSV1_ALERT_UNKNOWN_CA" in e.reason:
pass
else:
self.fail("Incorrect SSL error with bad client certificate")
else:
self.fail("Client able to log in with bad certificate")

configure_logging()
remove_test_files()


if __name__ == '__main__':
unittest.main(verbosity=VERBOSITY)