Skip to content

Commit

Permalink
bugfix #1853: garbage collector deletes guest users (#1928)
Browse files Browse the repository at this point in the history
* fixes bug #1853
* cleanup tests, warning, log and adds start banner in webserver
  • Loading branch information
pcrespov authored Nov 4, 2020
1 parent 3e03271 commit 2faf8ab
Show file tree
Hide file tree
Showing 22 changed files with 327 additions and 188 deletions.
7 changes: 5 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,12 @@ prof/
# outputs from make
.stack-*.yml


# Copies
# copies
services/**/.codeclimate.yml

# WSL
.fake_hostname_file
.bash_history

# pytest-fixture-tools output
artifacts
4 changes: 1 addition & 3 deletions packages/service-library/src/servicelib/logging_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,13 @@ def set_logging_handler(
formatting = DEFAULT_FORMATTING
if not formatter_base:
formatter_base = CustomFormatter
for handler in logger.handlers:

# handler = logging.StreamHandler()
for handler in logger.handlers:
handler.setFormatter(
formatter_base(
"%(levelname)s: %(name)s:%(funcName)s(%(lineno)s) - %(message)s"
)
)
# logger.addHandler(handler)


def _log_arguments(
Expand Down
13 changes: 12 additions & 1 deletion services/web/server/src/simcore_service_webserver/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,12 @@
from .__version__ import __version__
import warnings

from ._meta import __version__

#
# NOTE: Some BaseSettings are using aliases (e.g. version for vtag) to facility construct
# pydantic settings from names defined in trafaret schemas for the config files
#
warnings.filterwarnings(
"ignore",
message='aliases are no longer used by BaseSettings to define which environment variables to read. Instead use the "env" field setting. See https://pydantic-docs.helpmanual.io/usage/settings/#environment-variable-names',
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,15 @@

# legacy
api_version_prefix: str = api_vtag


WELCOME_MSG = r"""
_ _ _
| | | | | |
| | | | ___ | |__ ___ ___ _ __ __ __ ___ _ __
| |/\| | / _ \| '_ \ / __| / _ \| '__|\ \ / // _ \| '__|
\ /\ /| __/| |_) |\__ \| __/| | \ V /| __/| |
\/ \/ \___||_.__/ |___/ \___||_| \_/ \___||_| {0}
""".format(
f"v{__version__}"
)
10 changes: 8 additions & 2 deletions services/web/server/src/simcore_service_webserver/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from servicelib.application import create_safe_application

from ._meta import WELCOME_MSG
from .activity import setup_activity
from .catalog import setup_catalog
from .computation import setup_computation
Expand Down Expand Up @@ -67,14 +68,14 @@ def create_application(config: Dict) -> web.Application:
setup_storage(app)
setup_users(app)
setup_groups(app)
setup_projects(app) # needs storage
setup_studies_access(app)
setup_projects(app)
setup_activity(app)
setup_resource_manager(app)
setup_tags(app)
setup_catalog(app)
setup_publications(app)
setup_products(app)
setup_studies_access(app)

return app

Expand All @@ -85,6 +86,11 @@ def run_service(config: dict):

app = create_application(config)

async def welcome_banner(_app: web.Application):
print(WELCOME_MSG, flush=True)

app.on_startup.append(welcome_banner)

web.run_app(app, host=config["main"]["host"], port=config["main"]["port"])


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from servicelib.rest_responses import wrap_as_envelope
from servicelib.rest_routing import iter_path_operations

from .__version__ import api_version_prefix
from ._meta import api_version_prefix
from .catalog_config import assert_valid_config, get_client_session
from .constants import RQ_PRODUCT_KEY, X_PRODUCT_NAME_HEADER
from .login.decorators import RQT_USERID_KEY, login_required
Expand Down
28 changes: 2 additions & 26 deletions services/web/server/src/simcore_service_webserver/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,12 @@
from argparse import ArgumentParser
from typing import Dict, List, Optional

from aiodebug import log_slow_callbacks
from aiohttp.log import access_logger
from servicelib.logging_utils import set_logging_handler

from .application import run_service
from .application_config import CLI_DEFAULT_CONFIGFILE, app_schema
from .cli_config import add_cli_options, config_from_options
from .log import setup_logging
from .utils import search_osparc_repo_dir

LOG_LEVEL_STEP = logging.CRITICAL - logging.ERROR

log = logging.getLogger(__name__)


Expand Down Expand Up @@ -104,26 +99,7 @@ def main(args: Optional[List] = None):
config = parse(args, parser)

# service log level
log_level = getattr(logging, config["main"]["log_level"])
logging.basicConfig(level=log_level)
logging.root.setLevel(log_level)
set_logging_handler(logging.root)

# aiohttp access log-levels
access_logger.setLevel(log_level)

# keep mostly quiet noisy loggers
quiet_level = max(
min(log_level + LOG_LEVEL_STEP, logging.CRITICAL), logging.WARNING
)
logging.getLogger("engineio").setLevel(quiet_level)
logging.getLogger("openapi_spec_validator").setLevel(quiet_level)
logging.getLogger("sqlalchemy").setLevel(quiet_level)
logging.getLogger("sqlalchemy.engine").setLevel(quiet_level)

# NOTE: Every task blocking > AIODEBUG_SLOW_DURATION_SECS secs is considered slow and logged as warning
slow_duration = float(os.environ.get("AIODEBUG_SLOW_DURATION_SECS", 0.1))
log_slow_callbacks.enable(slow_duration)
setup_logging(level=config["main"]["log_level"])

# run
run_service(config)
45 changes: 45 additions & 0 deletions services/web/server/src/simcore_service_webserver/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
""" Configuration and utilities for service logging
"""
import logging
import os
from typing import Union

from aiodebug import log_slow_callbacks
from aiohttp.log import access_logger

from servicelib.logging_utils import set_logging_handler

LOG_LEVEL_STEP = logging.CRITICAL - logging.ERROR


def setup_logging(*, level: Union[str, int]):
# service log level
logging.basicConfig(level=level)

# root
logging.root.setLevel(level)
set_logging_handler(logging.root)

# aiohttp access log-levels
access_logger.setLevel(level)

# keep mostly quiet noisy loggers
quiet_level: int = max(min(logging.root.level + LOG_LEVEL_STEP, logging.CRITICAL), logging.WARNING)
logging.getLogger("engineio").setLevel(quiet_level)
logging.getLogger("openapi_spec_validator").setLevel(quiet_level)
logging.getLogger("sqlalchemy").setLevel(quiet_level)
logging.getLogger("sqlalchemy.engine").setLevel(quiet_level)

# NOTE: Every task blocking > AIODEBUG_SLOW_DURATION_SECS secs is considered slow and logged as warning
slow_duration = float(os.environ.get("AIODEBUG_SLOW_DURATION_SECS", 0.1))
log_slow_callbacks.enable(slow_duration)


def test_logger_propagation(logger: logging.Logger):
msg = f"TESTING %s log with {logger}"
logger.critical(msg, "critical")
logger.error(msg, "error")
logger.info(msg, "info")
logger.warning(msg, "warning")
logger.debug(msg, "debug")
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pydantic import BaseModel, ValidationError, validator
from servicelib.application_setup import ModuleCategory, app_module_setup

from .__version__ import api_vtag
from ._meta import api_vtag
from .constants import (
APP_DB_ENGINE_KEY,
APP_PRODUCTS_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import trafaret as T
from aiohttp.web import Application
from pydantic import BaseSettings, PositiveInt
from pydantic import BaseSettings, PositiveInt, Field

from models_library.settings.redis import RedisConfig
from servicelib.application_keys import APP_CONFIG_KEY
Expand Down Expand Up @@ -46,8 +46,12 @@ class RedisSection(RedisConfig):
class ResourceManagerSettings(BaseSettings):
enabled: bool = True

resource_deletion_timeout_seconds: Optional[PositiveInt] = 900
garbage_collection_interval_seconds: Optional[PositiveInt] = 30
resource_deletion_timeout_seconds: Optional[PositiveInt] = Field(
900, description="Expiration time (or Time to live (TTL) in redis jargon) for a registered resource"
)
garbage_collection_interval_seconds: Optional[PositiveInt] = Field(
30, description="Waiting time between consecutive runs of the garbage-colector"
)

redis: RedisSection

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,9 @@ async def garbage_collector_task(app: web.Application):
while keep_alive:
logger.info("Starting garbage collector...")
try:
registry = get_registry(app)
interval = get_garbage_collector_interval(app)
while True:
await collect_garbage(registry, app)
await collect_garbage(app)
await asyncio.sleep(interval)

except asyncio.CancelledError:
Expand All @@ -81,29 +80,30 @@ async def garbage_collector_task(app: web.Application):
await asyncio.sleep(5)


async def collect_garbage(registry: RedisResourceRegistry, app: web.Application):
async def collect_garbage(app: web.Application):
"""
Garbage collection has the task of removing trash from the system. The trash
Garbage collection has the task of removing trash (i.e. unused resources) from the system. The trash
can be divided in:
- Websockets & Redis (used to keep track of current active connections)
- GUEST users (used for temporary access to the system which are created on the fly)
- deletion of users. If a user needs to be deleted it is manually marked as GUEST
in the database
- Deletion of users. If a user needs to be deleted it can be set as GUEST in the database
The resources are Redis entries where all information regarding all the
websocket identifiers for all opened tabs accross all broser for each user
websocket identifiers for all opened tabs accross all browser for each user
are stored.
The alive/dead keys are normal Redis keys. To each key and ALIVE key is associated,
which has an assigned TTL. The browser will call the `client_heartbeat` websocket
The alive/dead keys are normal Redis keys. To each key an ALIVE key is associated,
which has an assigned TTL (Time To Live). The browser will call the `client_heartbeat` websocket
endpoint to refresh the TTL, thus declaring that the user (websocket connection) is
still active. The `resource_deletion_timeout_seconds` is theTTL of the key.
still active. The `resource_deletion_timeout_seconds` is the TTL of the key.
The field `garbage_collection_interval_seconds` defines the interval at which this
function will be called.
"""
logger.info("collecting garbage...")
logger.info("Collecting garbage...")

registry: RedisResourceRegistry = get_registry(app)

# Removes disconnected user resources
# Triggers signal to close possible pending opened projects
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async def redis_client(app: web.Application):
client = None
for attempt in Retrying(**retry_upon_init_policy):
with attempt:
client = await aioredis.create_redis_pool(url, encoding="utf-8")
client: aioredis.Redis = await aioredis.create_redis_pool(url, encoding="utf-8")
# create lock manager
lock_manager = Aioredlock([url])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import logging
from typing import Dict, List, Tuple

import aioredis
import attr
from aiohttp import web

Expand Down Expand Up @@ -53,75 +54,74 @@ def _decode_hash_key(cls, hash_key: str) -> Dict[str, str]:
key = dict(x.split("=") for x in tmp_key.split(":"))
return key

@property
def client(self) -> aioredis.Redis:
return get_redis_client(self.app)

async def set_resource(
self, key: Dict[str, str], resource: Tuple[str, str]
) -> None:
client = get_redis_client(self.app)
hash_key = f"{self._hash_key(key)}:{RESOURCE_SUFFIX}"
await client.hmset_dict(hash_key, **{resource[0]: resource[1]})
field, value = resource
await self.client.hmset_dict(hash_key, **{field: value})

async def get_resources(self, key: Dict[str, str]) -> Dict[str, str]:
client = get_redis_client(self.app)
hash_key = f"{self._hash_key(key)}:{RESOURCE_SUFFIX}"
return await client.hgetall(hash_key)
return await self.client.hgetall(hash_key)

async def remove_resource(self, key: Dict[str, str], resource_name: str) -> None:
client = get_redis_client(self.app)
hash_key = f"{self._hash_key(key)}:{RESOURCE_SUFFIX}"
await client.hdel(hash_key, resource_name)
await self.client.hdel(hash_key, resource_name)

async def find_resources(
self, key: Dict[str, str], resource_name: str
) -> List[str]:
client = get_redis_client(self.app)
resources = []
# the key might only be partialy complete
partial_hash_key = f"{self._hash_key(key)}:{RESOURCE_SUFFIX}"
async for key in client.iscan(match=partial_hash_key):
if await client.hexists(key, resource_name):
resources.append(await client.hget(key, resource_name))
async for key in self.client.iscan(match=partial_hash_key):
if await self.client.hexists(key, resource_name):
resources.append(await self.client.hget(key, resource_name))
return resources

async def find_keys(self, resource: Tuple[str, str]) -> List[Dict[str, str]]:
keys = []
if not resource:
return keys
client = get_redis_client(self.app)
async for hash_key in client.iscan(match=f"*:{RESOURCE_SUFFIX}"):
if resource[1] == await client.hget(hash_key, resource[0]):

field, value = resource

async for hash_key in self.client.iscan(match=f"*:{RESOURCE_SUFFIX}"):
if value == await self.client.hget(hash_key, field):
keys.append(self._decode_hash_key(hash_key))
return keys

async def set_key_alive(self, key: Dict[str, str], timeout: int) -> None:
# setting the timeout to always expire, timeout > 0
timeout = int(max(1, timeout))
client = get_redis_client(self.app)
hash_key = f"{self._hash_key(key)}:{ALIVE_SUFFIX}"
await client.set(hash_key, 1, expire=timeout)
await self.client.set(hash_key, 1, expire=timeout)

async def is_key_alive(self, key: Dict[str, str]) -> bool:
client = get_redis_client(self.app)
hash_key = f"{self._hash_key(key)}:{ALIVE_SUFFIX}"
return await client.exists(hash_key) > 0
return await self.client.exists(hash_key) > 0

async def remove_key(self, key: Dict[str, str]) -> None:
client = get_redis_client(self.app)
await client.delete(
await self.client.delete(
f"{self._hash_key(key)}:{RESOURCE_SUFFIX}",
f"{self._hash_key(key)}:{ALIVE_SUFFIX}",
)

async def get_all_resource_keys(
self,
) -> Tuple[List[Dict[str, str]], List[Dict[str, str]]]:
client = get_redis_client(self.app)
alive_keys = [
self._decode_hash_key(hash_key)
async for hash_key in client.iscan(match=f"*:{ALIVE_SUFFIX}")
async for hash_key in self.client.iscan(match=f"*:{ALIVE_SUFFIX}")
]
dead_keys = [
self._decode_hash_key(hash_key)
async for hash_key in client.iscan(match=f"*:{RESOURCE_SUFFIX}")
async for hash_key in self.client.iscan(match=f"*:{RESOURCE_SUFFIX}")
if self._decode_hash_key(hash_key) not in alive_keys
]

Expand Down
Loading

0 comments on commit 2faf8ab

Please sign in to comment.