Skip to content
This repository is currently being migrated. It's locked while the migration is in progress.

Shared pool #12

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ ChangeLog

# pytest files
/.cache
/.pytest_cache

# Build and artifact dirs
/build
Expand Down
39 changes: 25 additions & 14 deletions festung/dbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from festung.exceptions import OperationalError # NOQA
from festung.exceptions import ProgrammingError # NOQA
from festung.exceptions import Warning # NOQA
import festung.pool
from festung.pool import Pool
import festung.types
# Types have to be on the DBAPI module
from festung.types import BINARY # NOQA
Expand All @@ -34,16 +34,16 @@
from festung.types import Timestamp # NOQA


__all__ = ['connect', 'apilevel', 'paramstyle', 'threadsafety', 'Connection', 'Cursor']
__all__ = ['connect', 'apilevel', 'paramstyle', 'threadsafety', 'Connection', 'Cursor', 'Pool']


apilevel = '2.0'
threadsafety = 3 # Threads may share the module, connections and cursors
paramstyle = 'qmark'

DEFAULT_POOL = Pool()
SCHEME = 'festung'


error_to_exception = {
'data_error': DataError,
'database_error': DatabaseError,
Expand All @@ -57,14 +57,18 @@


class Connection(object):
def __init__(self, url, session=None):
def __init__(self, url, pool=None):
if furl(url).scheme != SCHEME:
raise ValueError("We only support festung:// connections.")
if pool is not None and not isinstance(pool, Pool):
raise ValueError("pool has to be an instance of `festung.Pool`.")
self.url = url
self.pool = festung.pool.get_pool(session)
self.pool = pool or DEFAULT_POOL

def close(self):
self.pool.close()
# External pools should be manually closed and the DEFAULT_POOL shouldn't be closed because
# we always want to re-use connections. Even accross different users/vaults.
pass

def commit(self):
# TODO(Antoine): Implement
Expand All @@ -78,9 +82,9 @@ def cursor(self):
return Cursor(self)

def _request(self, method, **kwargs):
req = requests.Request(method, to_http_url(self.url), **kwargs)
req.headers.update(to_headers(self.url))
return self.pool.request(req)
request = requests.Request(method, to_http_url(self.url), **kwargs)
request.headers.update(to_headers(self.url))
return self.pool.request(request)

def __repr__(self):
return "<Connection({})>".format(no_password_url(self.url))
Expand Down Expand Up @@ -116,7 +120,6 @@ def _generate_description(headers):
return [CursorDescription(*args) for args in all_args]


# FIXME(Antoine): Support contextmanager interface (for .close())
class Cursor(object):
def __init__(self, connection):
self.connection = connection
Expand All @@ -126,6 +129,12 @@ def __init__(self, connection):
self._arraysize = 1
self.lastrowid = 0

def __enter__(self):
return self

def __exit__(self, *args):
self.close()

@property
def description(self):
return self._description
Expand All @@ -148,12 +157,15 @@ def callproc(self, procname, parameters=None):
parameters = parameters or []
raise NotImplementedError

def close(self):
self.connection = None

def drop(self):
self._request('DELETE')

def execute(self, operation, parameters=None):
parameters = parameters or []
data = dict(sql=operation, params=[cast(p) for p in parameters])
data = {'sql': operation, 'params': [cast(p) for p in parameters]}
response = self._request('POST', json=data).json()
self._iter = iter(response['data'])
self.lastrowid = response['last_row_id']
Expand Down Expand Up @@ -213,10 +225,9 @@ def setoutputsize(size, columns=None):
def rownumber(self):
raise NotImplementedError

def close(self):
pass

def _request(self, *args, **kwargs):
if self.connection is None:
raise ProgrammingError("Cursor is already closed.")
try:
return self.connection._request(*args, **kwargs)
except requests.HTTPError as e:
Expand Down
68 changes: 23 additions & 45 deletions festung/pool.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,34 @@
import requests
from requests.adapters import HTTPAdapter
from requests import Session


class SessionPool(object):
"""Pool to manage connections to festung.

This pool expects the session connection to be managed externally. It
doesn't create nor close the TCP connection to festung.

Args:
session (requests.Session): A pre-created, externally managed `Session`
from the requests library.

Attributes:
session (requests.Session): The requests' session that this pool use to
connect to festung.
"""

def __init__(self, session):
self.session = session

def request(self, request):
resp = self.session.send(request.prepare())
resp.raise_for_status()
return resp

def close(self):
pass
DEFAULT_RETRIES = 1


class NewSessionPool(SessionPool):
"""Same as :class:`SessionPool` but instanciate a new `requests.Session`.
class Pool(object):
"""Pool to manage connections to festung.

Args:
session (NoneType): Only accept None as a session (for polymorphism)
max_retries (int): The maximum number of retries each connection
should attempt. Note, this applies only to failed DNS lookups, socket
connections and connection timeouts, never to requests where data has
made it to the server. Defaults to 1.
"""
def __init__(self, session):
if session is not None:
raise TypeError("session should be None")
super(NewSessionPool, self).__init__(requests.Session())

def close(self):
super(NewSessionPool, self).close()
self.session.close()
def __init__(self, max_retries=DEFAULT_RETRIES, **kwargs):
self._session = Session()
self._session.mount('http://', HTTPAdapter(max_retries=max_retries, **kwargs))
self._session.mount('https://', HTTPAdapter(max_retries=max_retries, **kwargs))

def __enter__(self):
return self

def get_pool_class(session):
if session is None:
return NewSessionPool
else:
return SessionPool
def __exit__(self, *args):
self.close()

def request(self, request):
response = self._session.send(request.prepare())
response.raise_for_status()
return response

def get_pool(session):
pool_class = get_pool_class(session)
return pool_class(session)
def close(self):
self._session.close()
25 changes: 14 additions & 11 deletions tests/test_dbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,25 @@

import pytest
import pytz
import requests

import festung.dbapi
from festung import exceptions
from festung.pool import Pool
from festung.types import Type


@pytest.fixture
def connection_session():
with requests.Session() as session:
yield session
def connection_pool():
with Pool() as pool:
yield pool


@pytest.fixture(params=['managed', 'external'])
def connection_kwargs(request, connection_session):
def connection_kwargs(request, connection_pool):
if request.param == 'managed':
return {}
elif request.param == 'external':
return dict(session=connection_session)
return {'pool': connection_pool}
else:
assert False, "Not all parameters are supported"

Expand All @@ -39,11 +39,14 @@ def connection(database_url, connection_kwargs):

@pytest.fixture
def cursor(connection):
cur = connection.cursor()
try:
yield cur
finally:
cur.close()
with connection.cursor() as cursor:
yield cursor


def test_closed_cursor(cursor):
cursor.close()
with pytest.raises(exceptions.ProgrammingError):
cursor.execute('SELECT 1')


class ResponseFixture(object):
Expand Down