diff --git a/listenbrainz/webserver/views/index.py b/listenbrainz/webserver/views/index.py index aa3705e811..3e18f6af18 100644 --- a/listenbrainz/webserver/views/index.py +++ b/listenbrainz/webserver/views/index.py @@ -1,7 +1,7 @@ import locale -import os import requests import time +import os from brainzutils import cache from datetime import datetime @@ -21,6 +21,7 @@ from listenbrainz.webserver import flash, db_conn, meb_conn, ts_conn from listenbrainz.webserver.timescale_connection import _ts from listenbrainz.webserver.redis_connection import _redis +from listenbrainz.webserver.views.status_api import get_service_status import listenbrainz.db.stats as db_stats import listenbrainz.db.user_relationship as db_user_relationship from listenbrainz.db.donation import get_recent_donors @@ -92,6 +93,7 @@ def current_status(): load = "%.2f %.2f %.2f" % os.getloadavg() + service_status = get_service_status() listen_count = _ts.get_total_listen_count() try: user_count = format(int(_get_user_count()), ',d') @@ -114,6 +116,7 @@ def current_status(): data = { "load": load, + "service-status": service_status, "listenCount": format(int(listen_count), ",d") if listen_count else "0", "userCount": user_count, "listenCountsPerDay": listen_counts_per_day, diff --git a/listenbrainz/webserver/views/stats_api.py b/listenbrainz/webserver/views/stats_api.py index a0f77462ee..649e2efcd5 100644 --- a/listenbrainz/webserver/views/stats_api.py +++ b/listenbrainz/webserver/views/stats_api.py @@ -327,6 +327,15 @@ def _get_entity_stats(user_name: str, entity: str, count_key: str): }}) +def get_entity_stats_last_updated(user_name: str, entity: str, count_key: str): + user, stats_range = _validate_stats_user_params(user_name) + stats = db_stats.get(user["id"], entity, stats_range, EntityRecord) + if stats is None: + return None + + entity_list, total_entity_count = _process_user_entity(stats, 0, 1) + return stats.last_updated + @stats_api_bp.route("/user//listening-activity") @crossdomain @ratelimit() diff --git a/listenbrainz/webserver/views/status_api.py b/listenbrainz/webserver/views/status_api.py index 91ab5ccfaf..89dcbc1be0 100644 --- a/listenbrainz/webserver/views/status_api.py +++ b/listenbrainz/webserver/views/status_api.py @@ -1,8 +1,25 @@ -from flask import Blueprint, request, jsonify +from datetime import datetime +from flask import Blueprint, request, jsonify, current_app +import requests +from time import sleep, time + +from kombu import Connection, Queue, Exchange +from kombu.exceptions import KombuError +from werkzeug.exceptions import ServiceUnavailable + from listenbrainz.webserver.errors import APIBadRequest, APINotFound from brainzutils.ratelimit import ratelimit - +from brainzutils import cache +from listenbrainz.webserver import db_conn, ts_conn +from listenbrainz.db.playlist import get_recommendation_playlists_for_user import listenbrainz.db.dump as db_dump +from listenbrainz.webserver.views.stats_api import get_entity_stats_last_updated + +STATUS_PREFIX = 'listenbrainz.status' # prefix used in key to cache status +CACHE_TIME = 60 * 60 # time in seconds we cache the fetched data +DUMP_CACHE_TIME = 24 * 60 * 60 # time in seconds we cache the dump check +LISTEN_COUNT_CACHE_TIME = 30 * 60 # time in seconds we cache the listen count +PLAYLIST_CACHE_TIME = 24 * 30 * 60 # time in seconds we cache latest playlist timestamp status_api_bp = Blueprint("status_api_v1", __name__) @@ -34,7 +51,7 @@ def get_dump_info(): dump_id = request.args.get("id") if dump_id is None: try: - dump = db_dump.get_dump_entries()[0] # return the latest dump + dump = db_dump.get_dump_entries()[0] # return the latest dump except IndexError: raise APINotFound("No dump entry exists.") else: @@ -64,3 +81,144 @@ def _convert_timestamp_to_string_dump_format(timestamp): String of the format "20190625-170100" """ return timestamp.strftime("%Y%m%d-%H%M%S") + + +def get_stats_timestamp(): + """ Check to see when statistics were last generated for a "random" user. Returns unix epoch timestamp""" + + cache_key = STATUS_PREFIX + ".stats-timestamp" + last_updated = cache.get(cache_key) + if last_updated is None: + last_updated = get_entity_stats_last_updated("rob", "artists", "total_artist_count") + if last_updated is None: + return None + + cache.set(cache_key, last_updated, CACHE_TIME) + + return last_updated + + +def get_playlists_timestamp(): + """ Check to see when recommendations playlists were last generated for a "random" user. Returns unix epoch timestamp""" + + cache_key = STATUS_PREFIX + ".playlist-timestamp" + last_updated = cache.get(cache_key) + if last_updated is None: + playlists = get_recommendation_playlists_for_user(db_conn, ts_conn, 1) + if playlists is None or not playlists: + return None + + last_updated = int(playlists[0].last_updated.timestamp()) + cache.set(cache_key, last_updated, PLAYLIST_CACHE_TIME) + + return last_updated + + +def get_incoming_listens_count(): + """ Check to see how many listens are currently in the incoming queue. Returns an unix epoch timestamp. """ + + cache_key = STATUS_PREFIX + ".incoming_listens" + listen_count = cache.get(cache_key) + if listen_count is None: + current_app.logger.warn("no cached data!") + try: + incoming_exchange = Exchange(current_app.config["INCOMING_EXCHANGE"], "fanout", durable=False) + incoming_queue = Queue(current_app.config["INCOMING_QUEUE"], exchange=incoming_exchange, durable=True) + + with Connection(hostname=current_app.config["RABBITMQ_HOST"], + userid=current_app.config["RABBITMQ_USERNAME"], + port=current_app.config["RABBITMQ_PORT"], + password=current_app.config["RABBITMQ_PASSWORD"], + virtual_host=current_app.config["RABBITMQ_VHOST"]) as conn: + + _, listen_count, _ = incoming_queue.queue_declare(channel=conn.channel(), passive=True) + except KombuError as err: + current_app.logger.error("RabbitMQ is currently not available. Error: %s" % (str(err))) + return None + + cache.set(cache_key, listen_count, LISTEN_COUNT_CACHE_TIME) + + return listen_count + + +def get_dump_timestamp(): + """ Check when the latst dump was generated. """ + + cache_key = STATUS_PREFIX + ".dump_timestamp" + dump_timestamp = cache.get(cache_key) + if dump_timestamp is None: + try: + dump = db_dump.get_dump_entries()[0] # return the latest dump + dump_timestamp = int(dump["created"].timestamp()) + cache.set(cache_key, dump_timestamp, DUMP_CACHE_TIME) + except IndexError: + return None + + return dump_timestamp + + +def get_service_status(): + """ Fetch the age of the last output of various services and return a dict: + { + "dump_age": null, + "incoming_listen_count": 2, + "playlists_age": 63229, + "stats_age": 418605, + "time": 1731429303 + } + """ + + current_ts = int(time()) + + dump = get_dump_timestamp() + if dump is None: + dump_age = None + else: + dump_age = current_ts - dump + + listen_count = get_incoming_listens_count() + + stats = get_stats_timestamp() + if stats is None: + stats_age = None + else: + stats_age = current_ts - stats + + playlists = get_playlists_timestamp() + if playlists is None: + playlists_age = None + else: + playlists_age = current_ts - playlists + + return { + "time": current_ts, + "dump_age": dump_age, + "stats_age": stats_age, + "playlists_age": playlists_age, + "incoming_listen_count": listen_count + } + + +@status_api_bp.route("/service-status", methods=["GET"]) +@ratelimit() +def service_status(): + """ Fetch the recently updated metrics for age of stats, playlists, dumps and the number of items in the incoming + queue. This function returns JSON: + + .. code-block:: json + + { + "time": 155574537, + "stats": { + "seconds_since_last_update": 1204 + }, + "incoming_listens": { + "count": 1028 + } + } + + :statuscode 200: You have data. + :resheader Content-Type: *application/json* + """ + + return jsonify(get_service_status())