From 9cc68bc266b8d3161ae2c73c2d03bca59a6159a9 Mon Sep 17 00:00:00 2001 From: Kartik Ohri Date: Mon, 6 Jan 2025 14:31:05 +0530 Subject: [PATCH 01/20] Experiment incremental user stats --- listenbrainz_spark/path.py | 3 + .../stats/incremental/user/__init__.py | 0 .../stats/incremental/user/artist.py | 120 ++++++++++++++++++ .../stats/incremental/user/entity.py | 117 +++++++++++++++++ listenbrainz_spark/stats/user/entity.py | 43 +++---- 5 files changed, 261 insertions(+), 22 deletions(-) create mode 100644 listenbrainz_spark/stats/incremental/user/__init__.py create mode 100644 listenbrainz_spark/stats/incremental/user/artist.py create mode 100644 listenbrainz_spark/stats/incremental/user/entity.py diff --git a/listenbrainz_spark/path.py b/listenbrainz_spark/path.py index 84282b805a..7f9c2d48bb 100644 --- a/listenbrainz_spark/path.py +++ b/listenbrainz_spark/path.py @@ -5,6 +5,9 @@ LISTENBRAINZ_INTERMEDIATE_STATS_DIRECTORY = os.path.join('/', 'data', 'stats-new') +LISTENBRAINZ_USER_STATS_AGG_DIRECTORY = os.path.join('/', 'user_stats_aggregates') +LISTENBRAINZ_USER_STATS_BOOKKEEPING_DIRECTORY = os.path.join('/', 'user_stats_bookkeeping') + LISTENBRAINZ_BASE_STATS_DIRECTORY = os.path.join('/', 'stats') LISTENBRAINZ_SITEWIDE_STATS_DIRECTORY = os.path.join(LISTENBRAINZ_BASE_STATS_DIRECTORY, 'sitewide') diff --git a/listenbrainz_spark/stats/incremental/user/__init__.py b/listenbrainz_spark/stats/incremental/user/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/listenbrainz_spark/stats/incremental/user/artist.py b/listenbrainz_spark/stats/incremental/user/artist.py new file mode 100644 index 0000000000..e6660a082f --- /dev/null +++ b/listenbrainz_spark/stats/incremental/user/artist.py @@ -0,0 +1,120 @@ +from typing import List + +from pyspark.sql.types import StructType, StructField, StringType, IntegerType + +from listenbrainz_spark.path import ARTIST_COUNTRY_CODE_DATAFRAME +from listenbrainz_spark.stats import run_query +from listenbrainz_spark.stats.incremental.user.entity import UserEntity + + +class ArtistUserEntity(UserEntity): + + def __init__(self): + super().__init__(entity="artists") + + def get_cache_tables(self) -> List[str]: + return [ARTIST_COUNTRY_CODE_DATAFRAME] + + def get_partial_aggregate_schema(self): + return StructType([ + StructField('user_id', IntegerType(), nullable=False), + StructField('artist_name', StringType(), nullable=False), + StructField('artist_mbid', StringType(), nullable=True), + StructField('listen_count', IntegerType(), nullable=False), + ]) + + def aggregate(self, table, cache_tables): + cache_table = cache_tables[0] + result = run_query(f""" + WITH exploded_listens AS ( + SELECT user_id + , artist_name AS artist_credit_name + , explode_outer(artist_credit_mbids) AS artist_mbid + FROM {table} + ), listens_with_mb_data as ( + SELECT user_id + , COALESCE(at.artist_name, el.artist_credit_name) AS artist_name + , el.artist_mbid + FROM exploded_listens el + LEFT JOIN {cache_table} at + ON el.artist_mbid = at.artist_mbid + ) + SELECT user_id + -- we group by lower(artist_name) and pick the first artist name for cases where + -- the artist name differs in case. for mapped listens the artist name from MB will + -- be used. for unmapped listens we can't know which case is correct so use any. note + -- that due to presence of artist mbid as the third group, mapped and unmapped listens + -- will always be separately grouped therefore first will work fine for unmapped + -- listens and doesn't matter for mapped ones. + , first(artist_name) AS artist_name + , artist_mbid + , count(*) AS listen_count + FROM listens_with_mb_data + GROUP BY user_id + , lower(artist_name) + , artist_mbid + """) + return result + + def combine_aggregates(self, existing_aggregate, incremental_aggregate): + query = f""" + WITH intermediate_table AS ( + SELECT user_id + , artist_name + , artist_mbid + , listen_count + FROM {existing_aggregate} + UNION ALL + SELECT user_id + , artist_name + , artist_mbid + , listen_count + FROM {incremental_aggregate} + ) + SELECT first(artist_name) AS artist_name + , artist_mbid + , sum(listen_count) as total_listen_count + FROM intermediate_table + GROUP BY lower(artist_name) + , artist_mbid + """ + return run_query(query) + + def get_top_n(self, final_aggregate, N): + query = f""" + WITH entity_count AS ( + SELECT user_id + , count(*) AS total_count + FROM {final_aggregate} + GROUP BY user_id + ), ranked_stats AS ( + SELECT user_id + , artist_name + , artist_mbid + , listen_count + , row_number() OVER (PARTITION BY user_id ORDER BY listen_count DESC) AS rank + FROM {final_aggregate} + ), grouped_stats AS ( + SELECT user_id + , sort_array( + collect_list( + struct( + listen_count + , artist_name + , artist_mbid + ) + ) + , false + ) as artists + FROM ranked_stats + WHERE rank <= {N} + GROUP BY user_id + ) + SELECT user_id + , artists_count + , artists + FROM grouped_stats + JOIN entity_count + USING (user_id) + """ + return run_query(query) diff --git a/listenbrainz_spark/stats/incremental/user/entity.py b/listenbrainz_spark/stats/incremental/user/entity.py new file mode 100644 index 0000000000..f45b39c38d --- /dev/null +++ b/listenbrainz_spark/stats/incremental/user/entity.py @@ -0,0 +1,117 @@ +import abc +import logging +from datetime import datetime +from pathlib import Path +from typing import List + +from pyspark.errors import AnalysisException +from pyspark.sql import DataFrame +from pyspark.sql.types import StructType, StructField, TimestampType + +import listenbrainz_spark +from listenbrainz_spark import hdfs_connection +from listenbrainz_spark.config import HDFS_CLUSTER_URI +from listenbrainz_spark.path import INCREMENTAL_DUMPS_SAVE_PATH, \ + LISTENBRAINZ_USER_STATS_AGG_DIRECTORY, LISTENBRAINZ_USER_STATS_BOOKKEEPING_DIRECTORY +from listenbrainz_spark.utils import read_files_from_HDFS, get_listens_from_dump + + +logger = logging.getLogger(__name__) +BOOKKEEPING_SCHEMA = StructType([ + StructField('from_date', TimestampType(), nullable=False), + StructField('to_date', TimestampType(), nullable=False), + StructField('created', TimestampType(), nullable=False), +]) + + +class UserEntity(abc.ABC): + + def __init__(self, entity): + self.entity = entity + + def get_existing_aggregate_path(self, stats_range) -> str: + return f"{LISTENBRAINZ_USER_STATS_AGG_DIRECTORY}/{self.entity}/{stats_range}" + + def get_bookkeeping_path(self, stats_range) -> str: + return f"{LISTENBRAINZ_USER_STATS_BOOKKEEPING_DIRECTORY}/{self.entity}/{stats_range}" + + def get_partial_aggregate_schema(self) -> StructType: + raise NotImplementedError() + + def aggregate(self, table, cache_tables) -> DataFrame: + raise NotImplementedError() + + def combine_aggregates(self, existing_aggregate, incremental_aggregate) -> DataFrame: + raise NotImplementedError() + + def get_top_n(self, final_aggregate, N) -> DataFrame: + raise NotImplementedError() + + def get_cache_tables(self) -> List[str]: + raise NotImplementedError() + + def generate_stats(self, stats_range: str, from_date: datetime, + to_date: datetime, top_entity_limit: int): + cache_tables = [] + for idx, df_path in enumerate(self.get_cache_tables()): + df_name = f"entity_data_cache_{idx}" + cache_tables.append(df_name) + read_files_from_HDFS(df_path).createOrReplaceTempView(df_name) + + metadata_path = self.get_bookkeeping_path(stats_range) + try: + metadata = listenbrainz_spark \ + .session \ + .read \ + .schema(BOOKKEEPING_SCHEMA) \ + .json(f"{HDFS_CLUSTER_URI}{metadata_path}") \ + .collect()[0] + existing_from_date, existing_to_date = metadata["from_date"], metadata["to_date"] + existing_aggregate_usable = existing_from_date.date() == from_date.date() + except AnalysisException: + existing_aggregate_usable = False + logger.info("Existing partial aggregate not found!") + + prefix = f"user_{self.entity}_{stats_range}" + existing_aggregate_path = self.get_existing_aggregate_path(stats_range) + + if not hdfs_connection.client.status(existing_aggregate_path, strict=False) or not existing_aggregate_usable: + table = f"{prefix}_full_listens" + get_listens_from_dump(from_date, to_date, include_incremental=False).createOrReplaceTempView(table) + + logger.info("Creating partial aggregate from full dump listens") + hdfs_connection.client.makedirs(Path(existing_aggregate_path).parent) + full_df = self.aggregate(table, cache_tables) + full_df.write.mode("overwrite").parquet(existing_aggregate_path) + + hdfs_connection.client.makedirs(Path(metadata_path).parent) + metadata_df = listenbrainz_spark.session.createDataFrame( + [(from_date, to_date, datetime.now())], + schema=BOOKKEEPING_SCHEMA + ) + metadata_df.write.mode("overwrite").json(metadata_path) + + full_df = read_files_from_HDFS(existing_aggregate_path) + + if hdfs_connection.client.status(INCREMENTAL_DUMPS_SAVE_PATH, strict=False): + table = f"{prefix}_incremental_listens" + read_files_from_HDFS(INCREMENTAL_DUMPS_SAVE_PATH) \ + .createOrReplaceTempView(table) + inc_df = self.aggregate(table, cache_tables) + else: + inc_df = listenbrainz_spark.session.createDataFrame([], schema=self.get_partial_aggregate_schema()) + + full_table = f"{prefix}_existing_aggregate" + full_df.createOrReplaceTempView(full_table) + + inc_table = f"{prefix}_incremental_aggregate" + inc_df.createOrReplaceTempView(inc_table) + + combined_df = self.combine_aggregates(full_table, inc_table) + + combined_table = f"{prefix}_combined_aggregate" + combined_df.createOrReplaceTempView(combined_table) + results_df = self.get_top_n(combined_table, top_entity_limit) + + return results_df.toLocalIterator() + \ No newline at end of file diff --git a/listenbrainz_spark/stats/user/entity.py b/listenbrainz_spark/stats/user/entity.py index 2dc53e5910..5efcfa8eb2 100644 --- a/listenbrainz_spark/stats/user/entity.py +++ b/listenbrainz_spark/stats/user/entity.py @@ -13,6 +13,7 @@ from listenbrainz_spark.path import RELEASE_METADATA_CACHE_DATAFRAME, ARTIST_COUNTRY_CODE_DATAFRAME, \ RELEASE_GROUP_METADATA_CACHE_DATAFRAME, RECORDING_ARTIST_DATAFRAME from listenbrainz_spark.stats import get_dates_for_stats_range +from listenbrainz_spark.stats.incremental.user.artist import ArtistUserEntity from listenbrainz_spark.stats.user import USERS_PER_MESSAGE from listenbrainz_spark.stats.user.artist import get_artists from listenbrainz_spark.stats.user.recording import get_recordings @@ -75,29 +76,27 @@ def get_entity_stats_for_range( database: str = None ): """ Calculate entity stats for all users' listens between the start and the end datetime. """ - listens_df = get_listens_from_dump(from_date, to_date) - table = f"user_{entity}_{stats_range}" - listens_df.createOrReplaceTempView(table) - - cache_dfs = [] - for idx, df_path in enumerate(entity_cache_map.get(entity)): - df_name = f"entity_data_cache_{idx}" - cache_dfs.append(df_name) - read_files_from_HDFS(df_path).createOrReplaceTempView(df_name) - - return calculate_entity_stats( - from_date, to_date, table, cache_dfs, entity, stats_range, message_type, database - ) - - -def calculate_entity_stats(from_date: datetime, to_date: datetime, table: str, cache_tables: List[str], - entity: str, stats_range: str, message_type: str, database: str = None): - handler = entity_handler_map[entity] - if message_type == "year_in_music_top_stats": - number_of_results = NUMBER_OF_YIM_ENTITIES + if entity == "artists": + entity_obj = ArtistUserEntity() + data = entity_obj.generate_stats(stats_range, from_date, to_date, NUMBER_OF_TOP_ENTITIES) else: - number_of_results = NUMBER_OF_TOP_ENTITIES - data = handler(table, cache_tables, number_of_results) + listens_df = get_listens_from_dump(from_date, to_date) + table = f"user_{entity}_{stats_range}" + listens_df.createOrReplaceTempView(table) + + cache_dfs = [] + for idx, df_path in enumerate(entity_cache_map.get(entity)): + df_name = f"entity_data_cache_{idx}" + cache_dfs.append(df_name) + read_files_from_HDFS(df_path).createOrReplaceTempView(df_name) + + handler = entity_handler_map[entity] + if message_type == "year_in_music_top_stats": + number_of_results = NUMBER_OF_YIM_ENTITIES + else: + number_of_results = NUMBER_OF_TOP_ENTITIES + data = handler(table, cache_dfs, number_of_results) + return create_messages(data=data, entity=entity, stats_range=stats_range, from_date=from_date, to_date=to_date, message_type=message_type, database=database) From bd4cb15c543bd7e4fee02b4ae29fc03d1641d3f3 Mon Sep 17 00:00:00 2001 From: Kartik Ohri Date: Mon, 6 Jan 2025 14:44:16 +0530 Subject: [PATCH 02/20] fix combine aggregate query --- listenbrainz_spark/stats/incremental/user/artist.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/listenbrainz_spark/stats/incremental/user/artist.py b/listenbrainz_spark/stats/incremental/user/artist.py index e6660a082f..d3dc97f0c2 100644 --- a/listenbrainz_spark/stats/incremental/user/artist.py +++ b/listenbrainz_spark/stats/incremental/user/artist.py @@ -71,11 +71,13 @@ def combine_aggregates(self, existing_aggregate, incremental_aggregate): , listen_count FROM {incremental_aggregate} ) - SELECT first(artist_name) AS artist_name + SELECT user_id + , first(artist_name) AS artist_name , artist_mbid - , sum(listen_count) as total_listen_count + , sum(listen_count) as listen_count FROM intermediate_table - GROUP BY lower(artist_name) + GROUP BY user_id + , lower(artist_name) , artist_mbid """ return run_query(query) From b265a12e2e809778543f600d7ae47d2b11996997 Mon Sep 17 00:00:00 2001 From: Kartik Ohri Date: Mon, 6 Jan 2025 14:45:57 +0530 Subject: [PATCH 03/20] fix get_top_n query --- listenbrainz_spark/stats/incremental/user/artist.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/listenbrainz_spark/stats/incremental/user/artist.py b/listenbrainz_spark/stats/incremental/user/artist.py index d3dc97f0c2..c892854680 100644 --- a/listenbrainz_spark/stats/incremental/user/artist.py +++ b/listenbrainz_spark/stats/incremental/user/artist.py @@ -86,7 +86,7 @@ def get_top_n(self, final_aggregate, N): query = f""" WITH entity_count AS ( SELECT user_id - , count(*) AS total_count + , count(*) AS artists_count FROM {final_aggregate} GROUP BY user_id ), ranked_stats AS ( From 224abc5b728e326ed0434b7810231b3f50733f3c Mon Sep 17 00:00:00 2001 From: Kartik Ohri Date: Mon, 6 Jan 2025 17:00:11 +0530 Subject: [PATCH 04/20] add remaining entities for incremental user stats --- .../stats/incremental/user/recording.py | 173 ++++++++++++++++++ .../stats/incremental/user/release.py | 164 +++++++++++++++++ .../stats/incremental/user/release_group.py | 170 +++++++++++++++++ listenbrainz_spark/stats/user/entity.py | 34 ++-- 4 files changed, 522 insertions(+), 19 deletions(-) create mode 100644 listenbrainz_spark/stats/incremental/user/recording.py create mode 100644 listenbrainz_spark/stats/incremental/user/release.py create mode 100644 listenbrainz_spark/stats/incremental/user/release_group.py diff --git a/listenbrainz_spark/stats/incremental/user/recording.py b/listenbrainz_spark/stats/incremental/user/recording.py new file mode 100644 index 0000000000..a04840ac6b --- /dev/null +++ b/listenbrainz_spark/stats/incremental/user/recording.py @@ -0,0 +1,173 @@ +from typing import List + +from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType + +from listenbrainz_spark.path import ARTIST_COUNTRY_CODE_DATAFRAME, RECORDING_ARTIST_DATAFRAME, \ + RELEASE_METADATA_CACHE_DATAFRAME +from listenbrainz_spark.schema import artists_column_schema +from listenbrainz_spark.stats import run_query +from listenbrainz_spark.stats.incremental.user.entity import UserEntity + + +class RecordingUserEntity(UserEntity): + + def __init__(self): + super().__init__(entity="recordings") + + def get_cache_tables(self) -> List[str]: + return [RECORDING_ARTIST_DATAFRAME, RELEASE_METADATA_CACHE_DATAFRAME] + + def get_partial_aggregate_schema(self): + return StructType([ + StructField('user_id', IntegerType(), nullable=False), + StructField('recording_name', StringType(), nullable=False), + StructField('recording_mbid', StringType(), nullable=True), + StructField('artist_name', StringType(), nullable=False), + StructField('artist_credit_mbids', ArrayType(StringType()), nullable=True), + StructField('release_name', StringType(), nullable=True), + StructField('release_mbid', StringType(), nullable=True), + StructField('artists', artists_column_schema, nullable=True), + StructField('caa_id', IntegerType(), nullable=True), + StructField('caa_release_mbid', StringType(), nullable=True), + StructField('listen_count', IntegerType(), nullable=False), + ]) + + def aggregate(self, table, cache_tables): + rec_cache_table = cache_tables[0] + rel_cache_table = cache_tables[1] + result = run_query(f""" + SELECT user_id + , first(l.recording_name) AS recording_name + , nullif(l.recording_mbid, '') AS recording_mbid + , first(l.artist_name) AS artist_name + , l.artist_credit_mbids + , nullif(first(l.release_name), '') as release_name + , nullif(l.release_mbid, '') AS release_mbid + , rec.artists + , rel.caa_id + , rel.caa_release_mbid + , count(*) as listen_count + FROM {table} l + LEFT JOIN {rec_cache_table} rec + ON rec.recording_mbid = l.recording_mbid + LEFT JOIN {rel_cache_table} rel + ON rel.release_mbid = l.release_mbid + GROUP BY l.user_id + , lower(l.recording_name) + , l.recording_mbid + , lower(l.artist_name) + , l.artist_credit_mbids + , lower(l.release_name) + , l.release_mbid + , rec.artists + , rel.caa_id + , rel.caa_release_mbid + """) + return result + + def combine_aggregates(self, existing_aggregate, incremental_aggregate): + query = f""" + WITH intermediate_table AS ( + SELECT user_id + , recording_name + , recording_mbid + , artist_name + , artist_credit_mbids + , release_name + , release_mbid + , artists + , caa_id + , caa_release_mbid + , listen_count + FROM {existing_aggregate} + UNION ALL + SELECT user_id + , recording_name + , recording_mbid + , artist_name + , artist_credit_mbids + , release_name + , release_mbid + , artists + , caa_id + , caa_release_mbid + , listen_count + FROM {incremental_aggregate} + ) + SELECT user_id + , first(recording_name) AS recording_name + , recording_mbid + , first(artist_name) AS artist_name + , artist_credit_mbids + , first(release_name) AS release_name + , release_mbid + , artists + , caa_id + , caa_release_mbid + , sum(listen_count) as listen_count + FROM intermediate_table + GROUP BY user_id + , lower(recording_name) + , recording_mbid + , lower(artist_name) + , artist_credit_mbids + , lower(release_name) + , release_mbid + , artists + , caa_id + , caa_release_mbid + """ + return run_query(query) + + def get_top_n(self, final_aggregate, N): + query = f""" + WITH entity_count AS ( + SELECT user_id + , count(*) AS recordings_count + FROM {final_aggregate} + GROUP BY user_id + ), ranked_stats AS ( + SELECT user_id + , recording_name as track_name + , recording_mbid + , release_name + , release_mbid + , artist_name + , artist_credit_mbids + , artists + , caa_id + , caa_release_mbid + , listen_count + , row_number() OVER (PARTITION BY user_id ORDER BY listen_count DESC) AS rank + FROM {final_aggregate} + ), grouped_stats AS ( + SELECT user_id + , sort_array( + collect_list( + struct( + listen_count + , track_name + , recording_mbid + , artist_name + , coalesce(artist_credit_mbids, array()) AS artist_mbids + , release_name + , release_mbid + , artists + , caa_id + , caa_release_mbid + ) + ) + , false + ) as recordings + FROM ranked_stats + WHERE rank <= {N} + GROUP BY user_id + ) + SELECT user_id + , recordings_count + , recordings + FROM grouped_stats + JOIN entity_count + USING (user_id) + """ + return run_query(query) diff --git a/listenbrainz_spark/stats/incremental/user/release.py b/listenbrainz_spark/stats/incremental/user/release.py new file mode 100644 index 0000000000..5d2dc17838 --- /dev/null +++ b/listenbrainz_spark/stats/incremental/user/release.py @@ -0,0 +1,164 @@ +from typing import List + +from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType + +from listenbrainz_spark.path import ARTIST_COUNTRY_CODE_DATAFRAME, RELEASE_METADATA_CACHE_DATAFRAME +from listenbrainz_spark.schema import artists_column_schema +from listenbrainz_spark.stats import run_query +from listenbrainz_spark.stats.incremental.user.entity import UserEntity + + +class ReleaseUserEntity(UserEntity): + + def __init__(self): + super().__init__(entity="releases") + + def get_cache_tables(self) -> List[str]: + return [RELEASE_METADATA_CACHE_DATAFRAME] + + def get_partial_aggregate_schema(self): + return StructType([ + StructField('user_id', IntegerType(), nullable=False), + StructField('release_name', StringType(), nullable=False), + StructField('release_mbid', StringType(), nullable=False), + StructField('artist_name', StringType(), nullable=False), + StructField('artist_credit_mbids', ArrayType(StringType()), nullable=False), + StructField('artists', artists_column_schema, nullable=True), + StructField('caa_id', IntegerType(), nullable=True), + StructField('caa_release_mbid', StringType(), nullable=True), + StructField('listen_count', IntegerType(), nullable=False), + ]) + + def aggregate(self, table, cache_tables): + cache_table = cache_tables[0] + result = run_query(f""" + WITH gather_release_data AS ( + SELECT user_id + , nullif(l.release_mbid, '') AS any_release_mbid + , COALESCE(rel.release_name, l.release_name) AS any_release_name + , COALESCE(rel.album_artist_name, l.artist_name) AS release_artist_name + , COALESCE(rel.artist_credit_mbids, l.artist_credit_mbids) AS artist_credit_mbids + , rel.artists + , rel.caa_id + , rel.caa_release_mbid + FROM {table} l + LEFT JOIN {cache_table} rel + ON rel.release_mbid = l.release_mbid + ) + SELECT user_id + , first(any_release_name) AS release_name + , release_mbid + , first(release_artist_name) AS artist_name + , artist_credit_mbids + , artists + , caa_id + , caa_release_mbid + , count(*) as listen_count + FROM gather_release_data + WHERE release_name != '' + AND release_name IS NOT NULL + GROUP BY user_id + , lower(release_name) + , release_mbid + , lower(release_artist_name) + , artist_credit_mbids + , artists + , caa_id + , caa_release_mbid + """) + return result + + def combine_aggregates(self, existing_aggregate, incremental_aggregate): + query = f""" + WITH intermediate_table AS ( + SELECT user_id + , release_name + , release_mbid + , artist_name + , artist_credit_mbids + , artists + , caa_id + , caa_release_mbid + , listen_count + FROM {existing_aggregate} + UNION ALL + SELECT user_id + , release_name + , release_mbid + , artist_name + , artist_credit_mbids + , artists + , caa_id + , caa_release_mbid + , listen_count + FROM {incremental_aggregate} + ) + SELECT user_id + , first(release_name) AS release_name + , release_mbid + , first(artist_name) AS artist_name + , artist_credit_mbids + , artists + , caa_id + , caa_release_mbid + , sum(listen_count) as listen_count + FROM intermediate_table + GROUP BY user_id + , lower(release_name) + , release_mbid + , lower(artist_name) + , artist_credit_mbids + , artists + , caa_id + , caa_release_mbid + """ + return run_query(query) + + def get_top_n(self, final_aggregate, N): + query = f""" + WITH entity_count AS ( + SELECT user_id + , count(*) AS releases_count + FROM {final_aggregate} + GROUP BY user_id + ), ranked_stats AS ( + SELECT user_id + , release_name + , release_mbid + , artist_name + , artist_credit_mbids + , artists + , caa_id + , caa_release_mbid + , listen_count + , row_number() OVER (PARTITION BY user_id ORDER BY listen_count DESC) AS rank + FROM {final_aggregate} + ), grouped_stats AS ( + SELECT user_id + , sort_array( + collect_list( + struct( + listen_count + , release_name + , release_mbid + , artist_name + , coalesce(artist_credit_mbids, array()) AS artist_mbids + , artists + , caa_id + , caa_release_mbid + ) + ) + , false + ) as releases + FROM ranked_stats + WHERE rank <= {N} + GROUP BY user_id + ) + SELECT user_id + , releases_count + , releases + FROM grouped_stats + JOIN entity_count + USING (user_id) + """ + return run_query(query) diff --git a/listenbrainz_spark/stats/incremental/user/release_group.py b/listenbrainz_spark/stats/incremental/user/release_group.py new file mode 100644 index 0000000000..115834913f --- /dev/null +++ b/listenbrainz_spark/stats/incremental/user/release_group.py @@ -0,0 +1,170 @@ +from typing import List + +from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType + +from listenbrainz_spark.path import ARTIST_COUNTRY_CODE_DATAFRAME, RELEASE_METADATA_CACHE_DATAFRAME, \ + RELEASE_GROUP_METADATA_CACHE_DATAFRAME +from listenbrainz_spark.schema import artists_column_schema +from listenbrainz_spark.stats import run_query +from listenbrainz_spark.stats.incremental.user.entity import UserEntity + + +class ReleaseGroupUserEntity(UserEntity): + + def __init__(self): + super().__init__(entity="release_groups") + + def get_cache_tables(self) -> List[str]: + return [RELEASE_METADATA_CACHE_DATAFRAME, RELEASE_GROUP_METADATA_CACHE_DATAFRAME] + + def get_partial_aggregate_schema(self): + return StructType([ + StructField('user_id', IntegerType(), nullable=False), + StructField('release_group_name', StringType(), nullable=False), + StructField('release_group_mbid', StringType(), nullable=False), + StructField('artist_name', StringType(), nullable=False), + StructField('artist_credit_mbids', ArrayType(StringType()), nullable=False), + StructField('artists', artists_column_schema, nullable=True), + StructField('caa_id', IntegerType(), nullable=True), + StructField('caa_release_mbid', StringType(), nullable=True), + StructField('listen_count', IntegerType(), nullable=False), + ]) + + def aggregate(self, table, cache_tables): + rel_cache_table = cache_tables[0] + rg_cache_table = cache_tables[1] + result = run_query(f""" + WITH gather_release_data AS ( + SELECT user_id + , rg.release_group_mbid + -- this is intentional as we don't have a release group name field in listen submission json + -- and for the purposes of this stat, they'd usually be the same. + , COALESCE(rg.title, l.release_name) AS release_group_name + , COALESCE(rg.artist_credit_name, l.artist_name) AS release_group_artist_name + , COALESCE(rg.artist_credit_mbids, l.artist_credit_mbids) AS artist_credit_mbids + , rg.artists + , rg.caa_id + , rg.caa_release_mbid + FROM {table} l + LEFT JOIN {rel_cache_table} rel + ON rel.release_mbid = l.release_mbid + LEFT JOIN {rg_cache_table} rg + ON rg.release_group_mbid = rel.release_group_mbid + ) + SELECT user_id + , first(release_group_name) AS release_group_name + , release_group_mbid + , first(release_group_artist_name) AS artist_name + , artist_credit_mbids + , caa_id + , caa_release_mbid + , artists + , count(*) as listen_count + FROM gather_release_data + WHERE release_group_name != '' + AND release_group_name IS NOT NULL + GROUP BY user_id + , lower(release_group_name) + , release_group_mbid + , lower(release_group_artist_name) + , artist_credit_mbids + , caa_id + , caa_release_mbid + , artists + """) + return result + + def combine_aggregates(self, existing_aggregate, incremental_aggregate): + query = f""" + WITH intermediate_table AS ( + SELECT user_id + , release_group_name + , release_group_mbid + , artist_name + , artist_credit_mbids + , artists + , caa_id + , caa_release_mbid + , listen_count + FROM {existing_aggregate} + UNION ALL + SELECT user_id + , release_group_name + , release_group_mbid + , artist_name + , artist_credit_mbids + , artists + , caa_id + , caa_release_mbid + , listen_count + FROM {incremental_aggregate} + ) + SELECT user_id + , first(release_group_name) AS release_group_name + , release_group_mbid + , first(artist_name) AS artist_name + , artist_credit_mbids + , artists + , caa_id + , caa_release_mbid + , sum(listen_count) as listen_count + FROM intermediate_table + GROUP BY user_id + , lower(release_group_name) + , release_group_mbid + , lower(artist_name) + , artist_credit_mbids + , artists + , caa_id + , caa_release_mbid + """ + return run_query(query) + + def get_top_n(self, final_aggregate, N): + query = f""" + WITH entity_count AS ( + SELECT user_id + , count(*) AS release_groups_count + FROM {final_aggregate} + GROUP BY user_id + ), ranked_stats AS ( + SELECT user_id + , release_group_name + , release_group_mbid + , artist_name + , artist_credit_mbids + , artists + , caa_id + , caa_release_mbid + , listen_count + , row_number() OVER (PARTITION BY user_id ORDER BY listen_count DESC) AS rank + FROM {final_aggregate} + ), grouped_stats AS ( + SELECT user_id + , sort_array( + collect_list( + struct( + listen_count + , release_group_name + , release_group_mbid + , artist_name + , coalesce(artist_credit_mbids, array()) AS artist_mbids + , artists + , caa_id + , caa_release_mbid + ) + ) + , false + ) as release_groups + FROM ranked_stats + WHERE rank <= {N} + GROUP BY user_id + ) + SELECT user_id + , release_groups_count + , release_groups + FROM grouped_stats + JOIN entity_count + USING (user_id) + """ + return run_query(query) diff --git a/listenbrainz_spark/stats/user/entity.py b/listenbrainz_spark/stats/user/entity.py index 5efcfa8eb2..d3cda78043 100644 --- a/listenbrainz_spark/stats/user/entity.py +++ b/listenbrainz_spark/stats/user/entity.py @@ -14,6 +14,9 @@ RELEASE_GROUP_METADATA_CACHE_DATAFRAME, RECORDING_ARTIST_DATAFRAME from listenbrainz_spark.stats import get_dates_for_stats_range from listenbrainz_spark.stats.incremental.user.artist import ArtistUserEntity +from listenbrainz_spark.stats.incremental.user.recording import RecordingUserEntity +from listenbrainz_spark.stats.incremental.user.release import ReleaseUserEntity +from listenbrainz_spark.stats.incremental.user.release_group import ReleaseGroupUserEntity from listenbrainz_spark.stats.user import USERS_PER_MESSAGE from listenbrainz_spark.stats.user.artist import get_artists from listenbrainz_spark.stats.user.recording import get_recordings @@ -44,6 +47,13 @@ "release_groups": [RELEASE_METADATA_CACHE_DATAFRAME, RELEASE_GROUP_METADATA_CACHE_DATAFRAME] } +incremental_entity_obj_map = { + "artists": ArtistUserEntity(), + "releases": ReleaseUserEntity(), + "recordings": RecordingUserEntity(), + "release_groups": ReleaseGroupUserEntity(), +} + NUMBER_OF_TOP_ENTITIES = 1000 # number of top entities to retain for user stats NUMBER_OF_YIM_ENTITIES = 50 # number of top entities to retain for Year in Music stats @@ -76,27 +86,13 @@ def get_entity_stats_for_range( database: str = None ): """ Calculate entity stats for all users' listens between the start and the end datetime. """ - if entity == "artists": - entity_obj = ArtistUserEntity() - data = entity_obj.generate_stats(stats_range, from_date, to_date, NUMBER_OF_TOP_ENTITIES) + if message_type == "year_in_music_top_stats": + number_of_results = NUMBER_OF_YIM_ENTITIES else: - listens_df = get_listens_from_dump(from_date, to_date) - table = f"user_{entity}_{stats_range}" - listens_df.createOrReplaceTempView(table) - - cache_dfs = [] - for idx, df_path in enumerate(entity_cache_map.get(entity)): - df_name = f"entity_data_cache_{idx}" - cache_dfs.append(df_name) - read_files_from_HDFS(df_path).createOrReplaceTempView(df_name) - - handler = entity_handler_map[entity] - if message_type == "year_in_music_top_stats": - number_of_results = NUMBER_OF_YIM_ENTITIES - else: - number_of_results = NUMBER_OF_TOP_ENTITIES - data = handler(table, cache_dfs, number_of_results) + number_of_results = NUMBER_OF_TOP_ENTITIES + entity_obj = incremental_entity_obj_map[entity] + data = entity_obj.generate_stats(stats_range, from_date, to_date, number_of_results) return create_messages(data=data, entity=entity, stats_range=stats_range, from_date=from_date, to_date=to_date, message_type=message_type, database=database) From 862f6b58b6e915bc6a77824ca8dacf4763167749 Mon Sep 17 00:00:00 2001 From: Kartik Ohri Date: Mon, 6 Jan 2025 19:05:13 +0530 Subject: [PATCH 05/20] fix release query --- listenbrainz_spark/stats/incremental/user/release.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/listenbrainz_spark/stats/incremental/user/release.py b/listenbrainz_spark/stats/incremental/user/release.py index 5d2dc17838..c48e12811d 100644 --- a/listenbrainz_spark/stats/incremental/user/release.py +++ b/listenbrainz_spark/stats/incremental/user/release.py @@ -47,7 +47,7 @@ def aggregate(self, table, cache_tables): ) SELECT user_id , first(any_release_name) AS release_name - , release_mbid + , any_release_mbid AS release_mbid , first(release_artist_name) AS artist_name , artist_credit_mbids , artists @@ -58,8 +58,8 @@ def aggregate(self, table, cache_tables): WHERE release_name != '' AND release_name IS NOT NULL GROUP BY user_id - , lower(release_name) - , release_mbid + , lower(any_release_name) + , any_release_mbid , lower(release_artist_name) , artist_credit_mbids , artists From 0e3861e48841659f845e76f5e9f3be8cb658b13d Mon Sep 17 00:00:00 2001 From: Kartik Ohri Date: Mon, 6 Jan 2025 19:08:00 +0530 Subject: [PATCH 06/20] fix release query - 2 --- listenbrainz_spark/stats/incremental/user/release.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/listenbrainz_spark/stats/incremental/user/release.py b/listenbrainz_spark/stats/incremental/user/release.py index c48e12811d..fd0886b662 100644 --- a/listenbrainz_spark/stats/incremental/user/release.py +++ b/listenbrainz_spark/stats/incremental/user/release.py @@ -55,8 +55,8 @@ def aggregate(self, table, cache_tables): , caa_release_mbid , count(*) as listen_count FROM gather_release_data - WHERE release_name != '' - AND release_name IS NOT NULL + WHERE any_release_name != '' + AND any_release_name IS NOT NULL GROUP BY user_id , lower(any_release_name) , any_release_mbid From 76c56586d994cec2c218fc5f6178bb3948beb30c Mon Sep 17 00:00:00 2001 From: Kartik Ohri Date: Tue, 7 Jan 2025 00:19:19 +0530 Subject: [PATCH 07/20] generate stats for only users in incremental dumps --- .../stats/incremental/user/entity.py | 24 +++++++++++++++++-- listenbrainz_spark/stats/user/entity.py | 24 ++++++++++--------- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/listenbrainz_spark/stats/incremental/user/entity.py b/listenbrainz_spark/stats/incremental/user/entity.py index f45b39c38d..3e558af360 100644 --- a/listenbrainz_spark/stats/incremental/user/entity.py +++ b/listenbrainz_spark/stats/incremental/user/entity.py @@ -13,6 +13,7 @@ from listenbrainz_spark.config import HDFS_CLUSTER_URI from listenbrainz_spark.path import INCREMENTAL_DUMPS_SAVE_PATH, \ LISTENBRAINZ_USER_STATS_AGG_DIRECTORY, LISTENBRAINZ_USER_STATS_BOOKKEEPING_DIRECTORY +from listenbrainz_spark.stats import run_query from listenbrainz_spark.utils import read_files_from_HDFS, get_listens_from_dump @@ -41,6 +42,14 @@ def get_partial_aggregate_schema(self) -> StructType: def aggregate(self, table, cache_tables) -> DataFrame: raise NotImplementedError() + def filter_existing_aggregate(self, existing_aggregate, incremental_aggregate): + query = f""" + SELECT * + FROM {existing_aggregate} ea + WHERE ea.user_id = EXISTS(SELECT 1 FROM {incremental_aggregate} ia WHERE ia.user_id = ea.user_id) + """ + return run_query(query) + def combine_aggregates(self, existing_aggregate, incremental_aggregate) -> DataFrame: raise NotImplementedError() @@ -75,6 +84,8 @@ def generate_stats(self, stats_range: str, from_date: datetime, prefix = f"user_{self.entity}_{stats_range}" existing_aggregate_path = self.get_existing_aggregate_path(stats_range) + only_inc_users = True + if not hdfs_connection.client.status(existing_aggregate_path, strict=False) or not existing_aggregate_usable: table = f"{prefix}_full_listens" get_listens_from_dump(from_date, to_date, include_incremental=False).createOrReplaceTempView(table) @@ -90,6 +101,7 @@ def generate_stats(self, stats_range: str, from_date: datetime, schema=BOOKKEEPING_SCHEMA ) metadata_df.write.mode("overwrite").json(metadata_path) + only_inc_users = False full_df = read_files_from_HDFS(existing_aggregate_path) @@ -100,6 +112,7 @@ def generate_stats(self, stats_range: str, from_date: datetime, inc_df = self.aggregate(table, cache_tables) else: inc_df = listenbrainz_spark.session.createDataFrame([], schema=self.get_partial_aggregate_schema()) + only_inc_users = False full_table = f"{prefix}_existing_aggregate" full_df.createOrReplaceTempView(full_table) @@ -107,11 +120,18 @@ def generate_stats(self, stats_range: str, from_date: datetime, inc_table = f"{prefix}_incremental_aggregate" inc_df.createOrReplaceTempView(inc_table) - combined_df = self.combine_aggregates(full_table, inc_table) + if only_inc_users: + existing_table = f"{prefix}_filtered_aggregate" + filtered_aggregate_df = self.filter_existing_aggregate(full_table, inc_table) + filtered_aggregate_df.createOrReplaceTempView(existing_table) + else: + existing_table = full_table + + combined_df = self.combine_aggregates(existing_table, inc_table) combined_table = f"{prefix}_combined_aggregate" combined_df.createOrReplaceTempView(combined_table) results_df = self.get_top_n(combined_table, top_entity_limit) - return results_df.toLocalIterator() + return only_inc_users, results_df.toLocalIterator() \ No newline at end of file diff --git a/listenbrainz_spark/stats/user/entity.py b/listenbrainz_spark/stats/user/entity.py index d3cda78043..dc8f14f6b2 100644 --- a/listenbrainz_spark/stats/user/entity.py +++ b/listenbrainz_spark/stats/user/entity.py @@ -92,8 +92,8 @@ def get_entity_stats_for_range( number_of_results = NUMBER_OF_TOP_ENTITIES entity_obj = incremental_entity_obj_map[entity] - data = entity_obj.generate_stats(stats_range, from_date, to_date, number_of_results) - return create_messages(data=data, entity=entity, stats_range=stats_range, from_date=from_date, + only_inc_users, data = entity_obj.generate_stats(stats_range, from_date, to_date, number_of_results) + return create_messages(only_inc_users, data=data, entity=entity, stats_range=stats_range, from_date=from_date, to_date=to_date, message_type=message_type, database=database) @@ -123,7 +123,7 @@ def parse_one_user_stats(entry, entity: str, stats_range: str): return None -def create_messages(data, entity: str, stats_range: str, from_date: datetime, to_date: datetime, +def create_messages(only_inc_users, data, entity: str, stats_range: str, from_date: datetime, to_date: datetime, message_type: str, database: str = None) \ -> Iterator[Optional[Dict]]: """ @@ -146,10 +146,11 @@ def create_messages(data, entity: str, stats_range: str, from_date: datetime, to if database is None: database = f"{entity}_{stats_range}" - yield { - "type": "couchdb_data_start", - "database": database - } + if not only_inc_users: + yield { + "type": "couchdb_data_start", + "database": database + } from_ts = int(from_date.timestamp()) to_ts = int(to_date.timestamp()) @@ -170,7 +171,8 @@ def create_messages(data, entity: str, stats_range: str, from_date: datetime, to "database": database } - yield { - "type": "couchdb_data_end", - "database": database - } + if not only_inc_users: + yield { + "type": "couchdb_data_end", + "database": database + } From c5693edd9372269c2bb4041bdbc03255e4008888 Mon Sep 17 00:00:00 2001 From: Kartik Ohri Date: Tue, 7 Jan 2025 00:26:32 +0530 Subject: [PATCH 08/20] fix filter query --- listenbrainz_spark/stats/incremental/user/entity.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/listenbrainz_spark/stats/incremental/user/entity.py b/listenbrainz_spark/stats/incremental/user/entity.py index 3e558af360..18fa39b4b0 100644 --- a/listenbrainz_spark/stats/incremental/user/entity.py +++ b/listenbrainz_spark/stats/incremental/user/entity.py @@ -44,9 +44,12 @@ def aggregate(self, table, cache_tables) -> DataFrame: def filter_existing_aggregate(self, existing_aggregate, incremental_aggregate): query = f""" + WITH incremental_users AS ( + SELECT DISTINCT user_id FROM {incremental_aggregate} + ) SELECT * FROM {existing_aggregate} ea - WHERE ea.user_id = EXISTS(SELECT 1 FROM {incremental_aggregate} ia WHERE ia.user_id = ea.user_id) + WHERE EXISTS(SELECT 1 FROM incremental_users iu WHERE iu.user_id = ea.user_id) """ return run_query(query) From 1165a3a4036a3555355c7f9b3d95421c650030af Mon Sep 17 00:00:00 2001 From: Kartik Ohri Date: Tue, 7 Jan 2025 00:59:49 +0530 Subject: [PATCH 09/20] do not create stats database daily if not needed --- listenbrainz/spark/request_manage.py | 6 ++---- listenbrainz_spark/stats/user/entity.py | 5 ++--- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/listenbrainz/spark/request_manage.py b/listenbrainz/spark/request_manage.py index f727617aa0..fbfa1d7882 100644 --- a/listenbrainz/spark/request_manage.py +++ b/listenbrainz/spark/request_manage.py @@ -103,11 +103,9 @@ def request_user_stats(type_, range_, entity, database): if type_ in ["entity", "listener"] and entity: params["entity"] = entity - if not database: + if not database and type_ != entity: today = date.today().strftime("%Y%m%d") - if type_ == "entity": - prefix = entity - elif type_ == "listeners": + if type_ == "listeners": prefix = f"{entity}_listeners" else: prefix = type_ diff --git a/listenbrainz_spark/stats/user/entity.py b/listenbrainz_spark/stats/user/entity.py index dc8f14f6b2..8c7337b8e2 100644 --- a/listenbrainz_spark/stats/user/entity.py +++ b/listenbrainz_spark/stats/user/entity.py @@ -1,6 +1,6 @@ import json import logging -from datetime import datetime +from datetime import datetime, date from typing import Iterator, Optional, Dict, List from more_itertools import chunked @@ -22,7 +22,6 @@ from listenbrainz_spark.stats.user.recording import get_recordings from listenbrainz_spark.stats.user.release import get_releases from listenbrainz_spark.stats.user.release_group import get_release_groups -from listenbrainz_spark.utils import get_listens_from_dump, read_files_from_HDFS logger = logging.getLogger(__name__) @@ -144,7 +143,7 @@ def create_messages(only_inc_users, data, entity: str, stats_range: str, from_da messages: A list of messages to be sent via RabbitMQ """ if database is None: - database = f"{entity}_{stats_range}" + database = f"{entity}_{stats_range}_{date.today().strftime('%Y%m%d')}" if not only_inc_users: yield { From bbfdf96196ba0c59cb56e09fc96cfe474417fee9 Mon Sep 17 00:00:00 2001 From: Kartik Ohri Date: Tue, 7 Jan 2025 01:09:06 +0530 Subject: [PATCH 10/20] fix stats type check --- listenbrainz/spark/request_manage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/listenbrainz/spark/request_manage.py b/listenbrainz/spark/request_manage.py index fbfa1d7882..c2e5ec039e 100644 --- a/listenbrainz/spark/request_manage.py +++ b/listenbrainz/spark/request_manage.py @@ -103,7 +103,7 @@ def request_user_stats(type_, range_, entity, database): if type_ in ["entity", "listener"] and entity: params["entity"] = entity - if not database and type_ != entity: + if not database and type_ != "entity": today = date.today().strftime("%Y%m%d") if type_ == "listeners": prefix = f"{entity}_listeners" From 90ba0f10e5b23dd395c41cd1147be1d4d6cac36e Mon Sep 17 00:00:00 2001 From: Kartik Ohri Date: Wed, 8 Jan 2025 21:54:50 +0530 Subject: [PATCH 11/20] refactor user stats class --- listenbrainz_spark/path.py | 4 +- .../stats/incremental/user/artist.py | 12 +- .../stats/incremental/user/entity.py | 147 +++++------------- .../stats/incremental/user/recording.py | 26 ++-- .../stats/incremental/user/release.py | 22 +-- .../stats/incremental/user/release_group.py | 22 +-- 6 files changed, 79 insertions(+), 154 deletions(-) diff --git a/listenbrainz_spark/path.py b/listenbrainz_spark/path.py index 7f9c2d48bb..e83d30b574 100644 --- a/listenbrainz_spark/path.py +++ b/listenbrainz_spark/path.py @@ -5,11 +5,9 @@ LISTENBRAINZ_INTERMEDIATE_STATS_DIRECTORY = os.path.join('/', 'data', 'stats-new') -LISTENBRAINZ_USER_STATS_AGG_DIRECTORY = os.path.join('/', 'user_stats_aggregates') -LISTENBRAINZ_USER_STATS_BOOKKEEPING_DIRECTORY = os.path.join('/', 'user_stats_bookkeeping') - LISTENBRAINZ_BASE_STATS_DIRECTORY = os.path.join('/', 'stats') LISTENBRAINZ_SITEWIDE_STATS_DIRECTORY = os.path.join(LISTENBRAINZ_BASE_STATS_DIRECTORY, 'sitewide') +LISTENBRAINZ_USER_STATS_DIRECTORY = os.path.join(LISTENBRAINZ_SITEWIDE_STATS_DIRECTORY, 'user') # MLHD+ dump files MLHD_PLUS_RAW_DATA_DIRECTORY = os.path.join("/", "mlhd-raw") diff --git a/listenbrainz_spark/stats/incremental/user/artist.py b/listenbrainz_spark/stats/incremental/user/artist.py index c892854680..1e334db22d 100644 --- a/listenbrainz_spark/stats/incremental/user/artist.py +++ b/listenbrainz_spark/stats/incremental/user/artist.py @@ -9,18 +9,18 @@ class ArtistUserEntity(UserEntity): - def __init__(self): - super().__init__(entity="artists") + def __init__(self, stats_range): + super().__init__(entity="artists", stats_range=stats_range) def get_cache_tables(self) -> List[str]: return [ARTIST_COUNTRY_CODE_DATAFRAME] def get_partial_aggregate_schema(self): return StructType([ - StructField('user_id', IntegerType(), nullable=False), - StructField('artist_name', StringType(), nullable=False), - StructField('artist_mbid', StringType(), nullable=True), - StructField('listen_count', IntegerType(), nullable=False), + StructField("user_id", IntegerType(), nullable=False), + StructField("artist_name", StringType(), nullable=False), + StructField("artist_mbid", StringType(), nullable=True), + StructField("listen_count", IntegerType(), nullable=False), ]) def aggregate(self, table, cache_tables): diff --git a/listenbrainz_spark/stats/incremental/user/entity.py b/listenbrainz_spark/stats/incremental/user/entity.py index 18fa39b4b0..ab4758811f 100644 --- a/listenbrainz_spark/stats/incremental/user/entity.py +++ b/listenbrainz_spark/stats/incremental/user/entity.py @@ -1,46 +1,22 @@ import abc import logging -from datetime import datetime -from pathlib import Path -from typing import List - -from pyspark.errors import AnalysisException -from pyspark.sql import DataFrame -from pyspark.sql.types import StructType, StructField, TimestampType - -import listenbrainz_spark -from listenbrainz_spark import hdfs_connection -from listenbrainz_spark.config import HDFS_CLUSTER_URI -from listenbrainz_spark.path import INCREMENTAL_DUMPS_SAVE_PATH, \ - LISTENBRAINZ_USER_STATS_AGG_DIRECTORY, LISTENBRAINZ_USER_STATS_BOOKKEEPING_DIRECTORY + +from listenbrainz_spark.path import LISTENBRAINZ_USER_STATS_DIRECTORY from listenbrainz_spark.stats import run_query -from listenbrainz_spark.utils import read_files_from_HDFS, get_listens_from_dump +from listenbrainz_spark.stats.incremental import IncrementalStats +from listenbrainz_spark.utils import read_files_from_HDFS logger = logging.getLogger(__name__) -BOOKKEEPING_SCHEMA = StructType([ - StructField('from_date', TimestampType(), nullable=False), - StructField('to_date', TimestampType(), nullable=False), - StructField('created', TimestampType(), nullable=False), -]) - -class UserEntity(abc.ABC): - - def __init__(self, entity): - self.entity = entity - - def get_existing_aggregate_path(self, stats_range) -> str: - return f"{LISTENBRAINZ_USER_STATS_AGG_DIRECTORY}/{self.entity}/{stats_range}" - def get_bookkeeping_path(self, stats_range) -> str: - return f"{LISTENBRAINZ_USER_STATS_BOOKKEEPING_DIRECTORY}/{self.entity}/{stats_range}" +class UserEntity(IncrementalStats, abc.ABC): - def get_partial_aggregate_schema(self) -> StructType: - raise NotImplementedError() + def get_base_path(self) -> str: + return LISTENBRAINZ_USER_STATS_DIRECTORY - def aggregate(self, table, cache_tables) -> DataFrame: - raise NotImplementedError() + def get_table_prefix(self) -> str: + return f"user_{self.entity}_{self.stats_range}" def filter_existing_aggregate(self, existing_aggregate, incremental_aggregate): query = f""" @@ -53,88 +29,39 @@ def filter_existing_aggregate(self, existing_aggregate, incremental_aggregate): """ return run_query(query) - def combine_aggregates(self, existing_aggregate, incremental_aggregate) -> DataFrame: - raise NotImplementedError() - - def get_top_n(self, final_aggregate, N) -> DataFrame: - raise NotImplementedError() - - def get_cache_tables(self) -> List[str]: - raise NotImplementedError() - - def generate_stats(self, stats_range: str, from_date: datetime, - to_date: datetime, top_entity_limit: int): - cache_tables = [] - for idx, df_path in enumerate(self.get_cache_tables()): - df_name = f"entity_data_cache_{idx}" - cache_tables.append(df_name) - read_files_from_HDFS(df_path).createOrReplaceTempView(df_name) - - metadata_path = self.get_bookkeeping_path(stats_range) - try: - metadata = listenbrainz_spark \ - .session \ - .read \ - .schema(BOOKKEEPING_SCHEMA) \ - .json(f"{HDFS_CLUSTER_URI}{metadata_path}") \ - .collect()[0] - existing_from_date, existing_to_date = metadata["from_date"], metadata["to_date"] - existing_aggregate_usable = existing_from_date.date() == from_date.date() - except AnalysisException: - existing_aggregate_usable = False - logger.info("Existing partial aggregate not found!") - - prefix = f"user_{self.entity}_{stats_range}" - existing_aggregate_path = self.get_existing_aggregate_path(stats_range) - - only_inc_users = True - - if not hdfs_connection.client.status(existing_aggregate_path, strict=False) or not existing_aggregate_usable: - table = f"{prefix}_full_listens" - get_listens_from_dump(from_date, to_date, include_incremental=False).createOrReplaceTempView(table) - - logger.info("Creating partial aggregate from full dump listens") - hdfs_connection.client.makedirs(Path(existing_aggregate_path).parent) - full_df = self.aggregate(table, cache_tables) - full_df.write.mode("overwrite").parquet(existing_aggregate_path) - - hdfs_connection.client.makedirs(Path(metadata_path).parent) - metadata_df = listenbrainz_spark.session.createDataFrame( - [(from_date, to_date, datetime.now())], - schema=BOOKKEEPING_SCHEMA - ) - metadata_df.write.mode("overwrite").json(metadata_path) - only_inc_users = False - - full_df = read_files_from_HDFS(existing_aggregate_path) + def generate_stats(self, top_entity_limit: int): + self.setup_cache_tables() + prefix = self.get_table_prefix() - if hdfs_connection.client.status(INCREMENTAL_DUMPS_SAVE_PATH, strict=False): - table = f"{prefix}_incremental_listens" - read_files_from_HDFS(INCREMENTAL_DUMPS_SAVE_PATH) \ - .createOrReplaceTempView(table) - inc_df = self.aggregate(table, cache_tables) - else: - inc_df = listenbrainz_spark.session.createDataFrame([], schema=self.get_partial_aggregate_schema()) + if not self.partial_aggregate_usable(): + self.create_partial_aggregate() only_inc_users = False + else: + only_inc_users = True - full_table = f"{prefix}_existing_aggregate" - full_df.createOrReplaceTempView(full_table) + partial_df = read_files_from_HDFS(self.get_existing_aggregate_path()) + partial_table = f"{prefix}_existing_aggregate" + partial_df.createOrReplaceTempView(partial_table) - inc_table = f"{prefix}_incremental_aggregate" - inc_df.createOrReplaceTempView(inc_table) + if self.incremental_dump_exists(): + inc_df = self.create_incremental_aggregate() + inc_table = f"{prefix}_incremental_aggregate" + inc_df.createOrReplaceTempView(inc_table) - if only_inc_users: - existing_table = f"{prefix}_filtered_aggregate" - filtered_aggregate_df = self.filter_existing_aggregate(full_table, inc_table) - filtered_aggregate_df.createOrReplaceTempView(existing_table) + if only_inc_users: + filtered_aggregate_df = self.filter_existing_aggregate(partial_table, inc_table) + filtered_table = f"{prefix}_filtered_aggregate" + filtered_aggregate_df.createOrReplaceTempView(filtered_table) + else: + filtered_table = partial_table + + final_df = self.combine_aggregates(filtered_table, inc_table) else: - existing_table = full_table + final_df = partial_df + only_inc_users = False - combined_df = self.combine_aggregates(existing_table, inc_table) - - combined_table = f"{prefix}_combined_aggregate" - combined_df.createOrReplaceTempView(combined_table) - results_df = self.get_top_n(combined_table, top_entity_limit) + final_table = f"{prefix}_final_aggregate" + final_df.createOrReplaceTempView(final_table) - return only_inc_users, results_df.toLocalIterator() - \ No newline at end of file + results_df = self.get_top_n(final_table, top_entity_limit) + return self.from_date, self.to_date, only_inc_users, results_df.toLocalIterator() diff --git a/listenbrainz_spark/stats/incremental/user/recording.py b/listenbrainz_spark/stats/incremental/user/recording.py index a04840ac6b..08bac15b68 100644 --- a/listenbrainz_spark/stats/incremental/user/recording.py +++ b/listenbrainz_spark/stats/incremental/user/recording.py @@ -11,25 +11,25 @@ class RecordingUserEntity(UserEntity): - def __init__(self): - super().__init__(entity="recordings") + def __init__(self, stats_range): + super().__init__(entity="recordings", stats_range=stats_range) def get_cache_tables(self) -> List[str]: return [RECORDING_ARTIST_DATAFRAME, RELEASE_METADATA_CACHE_DATAFRAME] def get_partial_aggregate_schema(self): return StructType([ - StructField('user_id', IntegerType(), nullable=False), - StructField('recording_name', StringType(), nullable=False), - StructField('recording_mbid', StringType(), nullable=True), - StructField('artist_name', StringType(), nullable=False), - StructField('artist_credit_mbids', ArrayType(StringType()), nullable=True), - StructField('release_name', StringType(), nullable=True), - StructField('release_mbid', StringType(), nullable=True), - StructField('artists', artists_column_schema, nullable=True), - StructField('caa_id', IntegerType(), nullable=True), - StructField('caa_release_mbid', StringType(), nullable=True), - StructField('listen_count', IntegerType(), nullable=False), + StructField("user_id", IntegerType(), nullable=False), + StructField("recording_name", StringType(), nullable=False), + StructField("recording_mbid", StringType(), nullable=True), + StructField("artist_name", StringType(), nullable=False), + StructField("artist_credit_mbids", ArrayType(StringType()), nullable=True), + StructField("release_name", StringType(), nullable=True), + StructField("release_mbid", StringType(), nullable=True), + StructField("artists", artists_column_schema, nullable=True), + StructField("caa_id", IntegerType(), nullable=True), + StructField("caa_release_mbid", StringType(), nullable=True), + StructField("listen_count", IntegerType(), nullable=False), ]) def aggregate(self, table, cache_tables): diff --git a/listenbrainz_spark/stats/incremental/user/release.py b/listenbrainz_spark/stats/incremental/user/release.py index fd0886b662..f5e290f4fe 100644 --- a/listenbrainz_spark/stats/incremental/user/release.py +++ b/listenbrainz_spark/stats/incremental/user/release.py @@ -10,23 +10,23 @@ class ReleaseUserEntity(UserEntity): - def __init__(self): - super().__init__(entity="releases") + def __init__(self, stats_range): + super().__init__(entity="releases", stats_range=stats_range) def get_cache_tables(self) -> List[str]: return [RELEASE_METADATA_CACHE_DATAFRAME] def get_partial_aggregate_schema(self): return StructType([ - StructField('user_id', IntegerType(), nullable=False), - StructField('release_name', StringType(), nullable=False), - StructField('release_mbid', StringType(), nullable=False), - StructField('artist_name', StringType(), nullable=False), - StructField('artist_credit_mbids', ArrayType(StringType()), nullable=False), - StructField('artists', artists_column_schema, nullable=True), - StructField('caa_id', IntegerType(), nullable=True), - StructField('caa_release_mbid', StringType(), nullable=True), - StructField('listen_count', IntegerType(), nullable=False), + StructField("user_id", IntegerType(), nullable=False), + StructField("release_name", StringType(), nullable=False), + StructField("release_mbid", StringType(), nullable=False), + StructField("artist_name", StringType(), nullable=False), + StructField("artist_credit_mbids", ArrayType(StringType()), nullable=False), + StructField("artists", artists_column_schema, nullable=True), + StructField("caa_id", IntegerType(), nullable=True), + StructField("caa_release_mbid", StringType(), nullable=True), + StructField("listen_count", IntegerType(), nullable=False), ]) def aggregate(self, table, cache_tables): diff --git a/listenbrainz_spark/stats/incremental/user/release_group.py b/listenbrainz_spark/stats/incremental/user/release_group.py index 115834913f..aa448d8f0d 100644 --- a/listenbrainz_spark/stats/incremental/user/release_group.py +++ b/listenbrainz_spark/stats/incremental/user/release_group.py @@ -11,23 +11,23 @@ class ReleaseGroupUserEntity(UserEntity): - def __init__(self): - super().__init__(entity="release_groups") + def __init__(self, stats_range): + super().__init__(entity="release_groups", stats_range=stats_range) def get_cache_tables(self) -> List[str]: return [RELEASE_METADATA_CACHE_DATAFRAME, RELEASE_GROUP_METADATA_CACHE_DATAFRAME] def get_partial_aggregate_schema(self): return StructType([ - StructField('user_id', IntegerType(), nullable=False), - StructField('release_group_name', StringType(), nullable=False), - StructField('release_group_mbid', StringType(), nullable=False), - StructField('artist_name', StringType(), nullable=False), - StructField('artist_credit_mbids', ArrayType(StringType()), nullable=False), - StructField('artists', artists_column_schema, nullable=True), - StructField('caa_id', IntegerType(), nullable=True), - StructField('caa_release_mbid', StringType(), nullable=True), - StructField('listen_count', IntegerType(), nullable=False), + StructField("user_id", IntegerType(), nullable=False), + StructField("release_group_name", StringType(), nullable=False), + StructField("release_group_mbid", StringType(), nullable=False), + StructField("artist_name", StringType(), nullable=False), + StructField("artist_credit_mbids", ArrayType(StringType()), nullable=False), + StructField("artists", artists_column_schema, nullable=True), + StructField("caa_id", IntegerType(), nullable=True), + StructField("caa_release_mbid", StringType(), nullable=True), + StructField("listen_count", IntegerType(), nullable=False), ]) def aggregate(self, table, cache_tables): From 71851be6f11c32a4f80b0f3a86c8cedf6b05ad1f Mon Sep 17 00:00:00 2001 From: Kartik Ohri Date: Thu, 9 Jan 2025 01:46:08 +0530 Subject: [PATCH 12/20] fix UserEntity usage --- listenbrainz_spark/stats/user/entity.py | 42 ++++++------------------- 1 file changed, 10 insertions(+), 32 deletions(-) diff --git a/listenbrainz_spark/stats/user/entity.py b/listenbrainz_spark/stats/user/entity.py index 8c7337b8e2..4e8408e49e 100644 --- a/listenbrainz_spark/stats/user/entity.py +++ b/listenbrainz_spark/stats/user/entity.py @@ -1,7 +1,7 @@ import json import logging from datetime import datetime, date -from typing import Iterator, Optional, Dict, List +from typing import Iterator, Optional, Dict from more_itertools import chunked from pydantic import ValidationError @@ -12,7 +12,6 @@ from data.model.user_release_stat import ReleaseRecord from listenbrainz_spark.path import RELEASE_METADATA_CACHE_DATAFRAME, ARTIST_COUNTRY_CODE_DATAFRAME, \ RELEASE_GROUP_METADATA_CACHE_DATAFRAME, RECORDING_ARTIST_DATAFRAME -from listenbrainz_spark.stats import get_dates_for_stats_range from listenbrainz_spark.stats.incremental.user.artist import ArtistUserEntity from listenbrainz_spark.stats.incremental.user.recording import RecordingUserEntity from listenbrainz_spark.stats.incremental.user.release import ReleaseUserEntity @@ -46,11 +45,11 @@ "release_groups": [RELEASE_METADATA_CACHE_DATAFRAME, RELEASE_GROUP_METADATA_CACHE_DATAFRAME] } -incremental_entity_obj_map = { - "artists": ArtistUserEntity(), - "releases": ReleaseUserEntity(), - "recordings": RecordingUserEntity(), - "release_groups": ReleaseGroupUserEntity(), +incremental_entity_map = { + "artists": ArtistUserEntity, + "releases": ReleaseUserEntity, + "recordings": RecordingUserEntity, + "release_groups": ReleaseGroupUserEntity, } NUMBER_OF_TOP_ENTITIES = 1000 # number of top entities to retain for user stats @@ -62,36 +61,14 @@ def get_entity_stats(entity: str, stats_range: str, message_type: str = "user_en """ Get the top entity for all users for specified stats_range """ logger.debug(f"Calculating user_{entity}_{stats_range}...") - from_date, to_date = get_dates_for_stats_range(stats_range) - messages = get_entity_stats_for_range( - entity, - stats_range, - from_date, - to_date, - message_type, - database - ) - - logger.debug("Done!") - return messages - - -def get_entity_stats_for_range( - entity: str, - stats_range: str, - from_date: datetime, - to_date: datetime, - message_type: str, - database: str = None -): - """ Calculate entity stats for all users' listens between the start and the end datetime. """ if message_type == "year_in_music_top_stats": number_of_results = NUMBER_OF_YIM_ENTITIES else: number_of_results = NUMBER_OF_TOP_ENTITIES - entity_obj = incremental_entity_obj_map[entity] - only_inc_users, data = entity_obj.generate_stats(stats_range, from_date, to_date, number_of_results) + entity_cls = incremental_entity_map[entity] + entity_obj = entity_cls(stats_range) + from_date, to_date, only_inc_users, data = entity_obj.generate_stats(number_of_results) return create_messages(only_inc_users, data=data, entity=entity, stats_range=stats_range, from_date=from_date, to_date=to_date, message_type=message_type, database=database) @@ -129,6 +106,7 @@ def create_messages(only_inc_users, data, entity: str, stats_range: str, from_da Create messages to send the data to the webserver via RabbitMQ Args: + only_inc_users: whether stats were generated only for users with listens present in incremental dumps data: Data to sent to the webserver entity: The entity for which statistics are calculated, i.e 'artists', 'releases' or 'recordings' From 668eac994bc1fc0c331349efd8ca164db42f7a55 Mon Sep 17 00:00:00 2001 From: Kartik Ohri Date: Thu, 9 Jan 2025 15:16:41 +0530 Subject: [PATCH 13/20] Refactor create messages and stats validation into class --- .../stats/incremental/user/artist.py | 4 +- .../stats/incremental/user/daily_activity.py | 113 ++++++++++++++++ .../stats/incremental/user/entity.py | 97 ++++++++++++- .../incremental/user/listening_activity.py | 96 +++++++++++++ .../stats/incremental/user/recording.py | 4 +- .../stats/incremental/user/release.py | 4 +- .../stats/incremental/user/release_group.py | 4 +- .../stats/user/daily_activity.py | 128 +----------------- listenbrainz_spark/stats/user/entity.py | 127 +---------------- .../stats/user/listening_activity.py | 114 +--------------- 10 files changed, 322 insertions(+), 369 deletions(-) create mode 100644 listenbrainz_spark/stats/incremental/user/daily_activity.py create mode 100644 listenbrainz_spark/stats/incremental/user/listening_activity.py diff --git a/listenbrainz_spark/stats/incremental/user/artist.py b/listenbrainz_spark/stats/incremental/user/artist.py index 1e334db22d..bf3f69694e 100644 --- a/listenbrainz_spark/stats/incremental/user/artist.py +++ b/listenbrainz_spark/stats/incremental/user/artist.py @@ -9,8 +9,8 @@ class ArtistUserEntity(UserEntity): - def __init__(self, stats_range): - super().__init__(entity="artists", stats_range=stats_range) + def __init__(self, stats_range, database, message_type): + super().__init__(entity="artists", stats_range=stats_range, database=database, message_type=message_type) def get_cache_tables(self) -> List[str]: return [ARTIST_COUNTRY_CODE_DATAFRAME] diff --git a/listenbrainz_spark/stats/incremental/user/daily_activity.py b/listenbrainz_spark/stats/incremental/user/daily_activity.py new file mode 100644 index 0000000000..5f3f1eb2cf --- /dev/null +++ b/listenbrainz_spark/stats/incremental/user/daily_activity.py @@ -0,0 +1,113 @@ +import calendar +import itertools +import json +import logging +from typing import List + +from pydantic import ValidationError +from pyspark.sql.types import StructType, StructField, StringType, IntegerType + +import listenbrainz_spark +from data.model.common_stat_spark import UserStatRecords +from data.model.user_daily_activity import DailyActivityRecord +from listenbrainz_spark.stats import run_query +from listenbrainz_spark.stats.incremental.user.entity import UserEntity + +logger = logging.getLogger(__name__) + + +class DailyActivityUserEntity(UserEntity): + + def __init__(self, stats_range, database, message_type): + super().__init__(entity="daily_activity", stats_range=stats_range, database=database, message_type=message_type) + self.setup_time_range() + + def setup_time_range(self): + # Genarate a dataframe containing hours of all days of the week + weekdays = [calendar.day_name[day] for day in range(0, 7)] + hours = [hour for hour in range(0, 24)] + time_range = itertools.product(weekdays, hours) + time_range_df = listenbrainz_spark.session.createDataFrame(time_range, schema=["day", "hour"]) + time_range_df.createOrReplaceTempView("time_range") + + def get_cache_tables(self) -> List[str]: + return [] + + def get_partial_aggregate_schema(self): + return StructType([ + StructField("user_id", IntegerType(), nullable=False), + StructField("day", StringType(), nullable=False), + StructField("hour", IntegerType(), nullable=False), + StructField("listen_count", IntegerType(), nullable=False), + ]) + + def aggregate(self, table, cache_tables): + result = run_query(f""" + SELECT user_id + , date_format(listened_at, 'EEEE') as day + , date_format(listened_at, 'H') as hour + , count(listened_at) AS listen_count + FROM {table} + GROUP BY user_id + , day + , hour + """) + return result + + def combine_aggregates(self, existing_aggregate, incremental_aggregate): + query = f""" + WITH intermediate_table AS ( + SELECT user_id + , day + , hour + , listen_count + FROM {existing_aggregate} + UNION ALL + SELECT user_id + , day + , hour + , listen_count + FROM {incremental_aggregate} + ) + SELECT user_id + , day + , hour + , sum(listen_count) as listen_count + FROM intermediate_table + GROUP BY user_id + , day + , hour + """ + return run_query(query) + + def get_top_n(self, final_aggregate, N): + query = f""" + SELECT user_id + , sort_array( + collect_list( + struct( + day + , hour + , COALESCE(listen_count, 0) AS listen_count + ) + ) + ) AS daily_activity + FROM time_range + LEFT JOIN {final_aggregate} + USING (day, hour) + GROUP BY user_id + """ + return run_query(query) + + def parse_one_user_stats(self, entry: dict): + try: + UserStatRecords[DailyActivityRecord]( + user_id=entry["user_id"], + data=entry["daily_activity"] + ) + return { + "user_id": entry["user_id"], + "data": entry["daily_activity"] + } + except ValidationError: + logger.error("Invalid entry in entity stats:", exc_info=True) diff --git a/listenbrainz_spark/stats/incremental/user/entity.py b/listenbrainz_spark/stats/incremental/user/entity.py index ab4758811f..1f9211dd4b 100644 --- a/listenbrainz_spark/stats/incremental/user/entity.py +++ b/listenbrainz_spark/stats/incremental/user/entity.py @@ -1,17 +1,44 @@ import abc +import json import logging +from datetime import date +from typing import Optional, Iterator, Dict, Tuple +from more_itertools import chunked +from pydantic import ValidationError +from pyspark.sql import DataFrame + +from data.model.user_artist_stat import ArtistRecord +from data.model.user_recording_stat import RecordingRecord +from data.model.user_release_group_stat import ReleaseGroupRecord +from data.model.user_release_stat import ReleaseRecord from listenbrainz_spark.path import LISTENBRAINZ_USER_STATS_DIRECTORY from listenbrainz_spark.stats import run_query from listenbrainz_spark.stats.incremental import IncrementalStats +from listenbrainz_spark.stats.user import USERS_PER_MESSAGE from listenbrainz_spark.utils import read_files_from_HDFS logger = logging.getLogger(__name__) +entity_model_map = { + "artists": ArtistRecord, + "releases": ReleaseRecord, + "recordings": RecordingRecord, + "release_groups": ReleaseGroupRecord, +} + class UserEntity(IncrementalStats, abc.ABC): + def __init__(self, entity: str, stats_range: str, database: Optional[str], message_type: Optional[str]): + super().__init__(entity, stats_range) + if database: + self.database = database + else: + self.database = f"{self.entity}_{self.stats_range}_{date.today().strftime('%Y%m%d')}" + self.message_type = message_type + def get_base_path(self) -> str: return LISTENBRAINZ_USER_STATS_DIRECTORY @@ -29,7 +56,7 @@ def filter_existing_aggregate(self, existing_aggregate, incremental_aggregate): """ return run_query(query) - def generate_stats(self, top_entity_limit: int): + def generate_stats(self, top_entity_limit: int) -> Tuple[bool, DataFrame]: self.setup_cache_tables() prefix = self.get_table_prefix() @@ -64,4 +91,70 @@ def generate_stats(self, top_entity_limit: int): final_df.createOrReplaceTempView(final_table) results_df = self.get_top_n(final_table, top_entity_limit) - return self.from_date, self.to_date, only_inc_users, results_df.toLocalIterator() + return only_inc_users, results_df + + def parse_one_user_stats(self, entry: dict): + count_key = self.entity + "_count" + total_entity_count = entry[count_key] + + entity_list = [] + for item in entry[self.entity]: + try: + entity_model_map[self.entity](**item) + entity_list.append(item) + except ValidationError: + logger.error("Invalid entry in entity stats:", exc_info=True) + total_entity_count -= 1 + + return { + "user_id": entry["user_id"], + "data": entity_list, + "count": total_entity_count + } + + def create_messages(self, only_inc_users, results: DataFrame) -> Iterator[Dict]: + """ + Create messages to send the data to the webserver via RabbitMQ + + Args: + only_inc_users: whether stats were generated only for users with listens present in incremental dumps + results: Data to sent to the webserver + """ + if not only_inc_users: + yield { + "type": "couchdb_data_start", + "database": self.database + } + + from_ts = int(self.from_date.timestamp()) + to_ts = int(self.to_date.timestamp()) + + data = results.toLocalIterator() + for entries in chunked(data, USERS_PER_MESSAGE): + multiple_user_stats = [] + for entry in entries: + row = entry.asDict(recursive=True) + processed_stat = self.parse_one_user_stats(row) + if processed_stat is not None: + multiple_user_stats.append(processed_stat) + + yield { + "type": self.message_type, + "stats_range": self.stats_range, + "from_ts": from_ts, + "to_ts": to_ts, + "entity": self.entity, + "data": multiple_user_stats, + "database": self.database + } + + if not only_inc_users: + yield { + "type": "couchdb_data_end", + "database": self.database + } + + def main(self, top_entity_limit: int): + only_inc_users, results = self.generate_stats(top_entity_limit) + itr = self.create_messages(only_inc_users, results) + return itr diff --git a/listenbrainz_spark/stats/incremental/user/listening_activity.py b/listenbrainz_spark/stats/incremental/user/listening_activity.py new file mode 100644 index 0000000000..68768b1f12 --- /dev/null +++ b/listenbrainz_spark/stats/incremental/user/listening_activity.py @@ -0,0 +1,96 @@ +import logging +from typing import List + +from pydantic import ValidationError +from pyspark.sql.types import StructType, StructField, StringType, IntegerType + +from data.model.common_stat_spark import UserStatRecords +from data.model.user_listening_activity import ListeningActivityRecord +from listenbrainz_spark.stats import run_query +from listenbrainz_spark.stats.common.listening_activity import setup_time_range +from listenbrainz_spark.stats.incremental.user.entity import UserEntity + +logger = logging.getLogger(__name__) + + +class ListeningActivityUserEntity(UserEntity): + + def __init__(self, stats_range, database, message_type): + super().__init__(entity="listening_activity", stats_range=stats_range, database=database, message_type=message_type) + self.from_date, self.to_date, _, __, self.spark_date_format = setup_time_range(stats_range) + + def get_cache_tables(self) -> List[str]: + return [] + + def get_partial_aggregate_schema(self): + return StructType([ + StructField("user_id", IntegerType(), nullable=False), + StructField("time_range", StringType(), nullable=False), + StructField("listen_count", IntegerType(), nullable=False), + ]) + + def aggregate(self, table, cache_tables): + result = run_query(f""" + SELECT user_id + , date_format(listened_at, '{self.spark_date_format}') AS time_range + , count(listened_at) AS listen_count + FROM {table} + GROUP BY user_id + , time_range + """) + return result + + def combine_aggregates(self, existing_aggregate, incremental_aggregate): + query = f""" + WITH intermediate_table AS ( + SELECT user_id + , time_range + , listen_count + FROM {existing_aggregate} + UNION ALL + SELECT user_id + , time_range + , listen_count + FROM {incremental_aggregate} + ) + SELECT user_id + , time_range + , sum(listen_count) as listen_count + FROM intermediate_table + GROUP BY user_id + , time_range + """ + return run_query(query) + + def get_top_n(self, final_aggregate, N): + query = f""" + SELECT user_id + , sort_array( + collect_list( + struct( + to_unix_timestamp(start) AS from_ts + , to_unix_timestamp(end) AS to_ts + , time_range + , COALESCE(listen_count, 0) AS listen_count + ) + ) + ) AS listening_activity + FROM time_range + LEFT JOIN {final_aggregate} + USING (time_range) + GROUP BY user_id + """ + return run_query(query) + + def parse_one_user_stats(self, entry: dict): + try: + UserStatRecords[ListeningActivityRecord]( + user_id=entry["user_id"], + data=entry["listening_activity"] + ) + return { + "user_id": entry["user_id"], + "data": entry["listening_activity"] + } + except ValidationError: + logger.error("Invalid entry in entity stats:", exc_info=True) diff --git a/listenbrainz_spark/stats/incremental/user/recording.py b/listenbrainz_spark/stats/incremental/user/recording.py index 08bac15b68..994bd5b4a9 100644 --- a/listenbrainz_spark/stats/incremental/user/recording.py +++ b/listenbrainz_spark/stats/incremental/user/recording.py @@ -11,8 +11,8 @@ class RecordingUserEntity(UserEntity): - def __init__(self, stats_range): - super().__init__(entity="recordings", stats_range=stats_range) + def __init__(self, stats_range, database, message_type): + super().__init__(entity="recordings", stats_range=stats_range, database=database, message_type=message_type) def get_cache_tables(self) -> List[str]: return [RECORDING_ARTIST_DATAFRAME, RELEASE_METADATA_CACHE_DATAFRAME] diff --git a/listenbrainz_spark/stats/incremental/user/release.py b/listenbrainz_spark/stats/incremental/user/release.py index f5e290f4fe..6119c474cf 100644 --- a/listenbrainz_spark/stats/incremental/user/release.py +++ b/listenbrainz_spark/stats/incremental/user/release.py @@ -10,8 +10,8 @@ class ReleaseUserEntity(UserEntity): - def __init__(self, stats_range): - super().__init__(entity="releases", stats_range=stats_range) + def __init__(self, stats_range, database, message_type): + super().__init__(entity="releases", stats_range=stats_range, database=database, message_type=message_type) def get_cache_tables(self) -> List[str]: return [RELEASE_METADATA_CACHE_DATAFRAME] diff --git a/listenbrainz_spark/stats/incremental/user/release_group.py b/listenbrainz_spark/stats/incremental/user/release_group.py index aa448d8f0d..535bffcc69 100644 --- a/listenbrainz_spark/stats/incremental/user/release_group.py +++ b/listenbrainz_spark/stats/incremental/user/release_group.py @@ -11,8 +11,8 @@ class ReleaseGroupUserEntity(UserEntity): - def __init__(self, stats_range): - super().__init__(entity="release_groups", stats_range=stats_range) + def __init__(self, stats_range, database, message_type): + super().__init__(entity="release_groups", stats_range=stats_range, database=database, message_type=message_type) def get_cache_tables(self) -> List[str]: return [RELEASE_METADATA_CACHE_DATAFRAME, RELEASE_GROUP_METADATA_CACHE_DATAFRAME] diff --git a/listenbrainz_spark/stats/user/daily_activity.py b/listenbrainz_spark/stats/user/daily_activity.py index 5ef77a6fe7..77576f4086 100644 --- a/listenbrainz_spark/stats/user/daily_activity.py +++ b/listenbrainz_spark/stats/user/daily_activity.py @@ -1,136 +1,14 @@ -import itertools -import json -import calendar import logging -from datetime import datetime from typing import Iterator, Optional, Dict -from more_itertools import chunked -from pydantic import ValidationError - -import listenbrainz_spark -from data.model.common_stat_spark import UserStatRecords, StatMessage -from data.model.user_daily_activity import DailyActivityRecord -from listenbrainz_spark.stats import run_query, get_dates_for_stats_range -from listenbrainz_spark.stats.user import USERS_PER_MESSAGE -from listenbrainz_spark.utils import get_listens_from_dump -from pyspark.sql.functions import collect_list, sort_array, struct +from listenbrainz_spark.stats.incremental.user.daily_activity import DailyActivityUserEntity logger = logging.getLogger(__name__) -def calculate_daily_activity(): - """ Calculate number of listens for each user in each hour. """ - - # Genarate a dataframe containing hours of all days of the week - weekdays = [calendar.day_name[day] for day in range(0, 7)] - hours = [hour for hour in range(0, 24)] - time_range = itertools.product(weekdays, hours) - time_range_df = listenbrainz_spark.session.createDataFrame(time_range, schema=["day", "hour"]) - time_range_df.createOrReplaceTempView("time_range") - - # Truncate listened_at to day and hour to improve matching speed - formatted_listens = run_query(""" - SELECT user_id - , date_format(listened_at, 'EEEE') as day - , date_format(listened_at, 'H') as hour - FROM listens - """) - - formatted_listens.createOrReplaceTempView("listens") - - # Calculate the number of listens in each time range for each user except the time ranges which have zero listens. - result = run_query(""" - SELECT listens.user_id - , time_range.day - , time_range.hour - , count(*) as listen_count - FROM listens - JOIN time_range - ON listens.day == time_range.day - AND listens.hour == time_range.hour - GROUP BY listens.user_id - , time_range.day - , time_range.hour - """) - - # Create a table with a list of time ranges and corresponding listen count for each user - iterator = result \ - .withColumn("daily_activity", struct("hour", "day", "listen_count")) \ - .groupBy("user_id") \ - .agg(sort_array(collect_list("daily_activity")).alias("daily_activity")) \ - .toLocalIterator() - - return iterator - - def get_daily_activity(stats_range: str, database: str = None) -> Iterator[Optional[Dict]]: """ Calculate number of listens for an user for the specified time range """ logger.debug(f"Calculating daily_activity_{stats_range}") - - from_date, to_date = get_dates_for_stats_range(stats_range) - get_listens_from_dump(from_date, to_date).createOrReplaceTempView("listens") - data = calculate_daily_activity() - messages = create_messages(data=data, stats_range=stats_range, from_date=from_date, - to_date=to_date, database=database) - logger.debug("Done!") - - return messages - - -def create_messages(data, stats_range: str, from_date: datetime, to_date: datetime, database: str = None) \ - -> Iterator[Optional[Dict]]: - """ - Create messages to send the data to webserver via RabbitMQ - - Args: - data: Data to send to webserver - stats_range: The range for which the statistics have been calculated - from_date: The start time of the stats - to_date: The end time of the stats - database: the name of the database in which the webserver should store the data - Returns: - messages: A list of messages to be sent via RabbitMQ - """ - if database is None: - database = f"daily_activity_{stats_range}" - - yield { - "type": "couchdb_data_start", - "database": database - } - - from_ts = int(from_date.timestamp()) - to_ts = int(to_date.timestamp()) - - for entries in chunked(data, USERS_PER_MESSAGE): - multiple_user_stats = [] - for entry in entries: - _dict = entry.asDict(recursive=True) - try: - UserStatRecords[DailyActivityRecord]( - user_id=_dict["user_id"], - data=_dict["daily_activity"] - ) - multiple_user_stats.append({ - "user_id": _dict["user_id"], - "data": _dict["daily_activity"] - }) - except ValidationError: - logger.error(f"""ValidationError while calculating {stats_range} daily_activity for user: - {_dict["user_id"]}. Data: {json.dumps(_dict, indent=3)}""", exc_info=True) - - yield { - "type": "user_daily_activity", - "stats_range": stats_range, - "from_ts": from_ts, - "to_ts": to_ts, - "data": multiple_user_stats, - "database": database - } - - yield { - "type": "couchdb_data_end", - "database": database - } + entity_obj = DailyActivityUserEntity(stats_range, database, message_type="user_daily_activity") + return entity_obj.main(0) diff --git a/listenbrainz_spark/stats/user/entity.py b/listenbrainz_spark/stats/user/entity.py index 4e8408e49e..0eaf680886 100644 --- a/listenbrainz_spark/stats/user/entity.py +++ b/listenbrainz_spark/stats/user/entity.py @@ -1,50 +1,13 @@ -import json import logging -from datetime import datetime, date from typing import Iterator, Optional, Dict -from more_itertools import chunked -from pydantic import ValidationError - -from data.model.user_artist_stat import ArtistRecord -from data.model.user_recording_stat import RecordingRecord -from data.model.user_release_group_stat import ReleaseGroupRecord -from data.model.user_release_stat import ReleaseRecord -from listenbrainz_spark.path import RELEASE_METADATA_CACHE_DATAFRAME, ARTIST_COUNTRY_CODE_DATAFRAME, \ - RELEASE_GROUP_METADATA_CACHE_DATAFRAME, RECORDING_ARTIST_DATAFRAME from listenbrainz_spark.stats.incremental.user.artist import ArtistUserEntity from listenbrainz_spark.stats.incremental.user.recording import RecordingUserEntity from listenbrainz_spark.stats.incremental.user.release import ReleaseUserEntity from listenbrainz_spark.stats.incremental.user.release_group import ReleaseGroupUserEntity -from listenbrainz_spark.stats.user import USERS_PER_MESSAGE -from listenbrainz_spark.stats.user.artist import get_artists -from listenbrainz_spark.stats.user.recording import get_recordings -from listenbrainz_spark.stats.user.release import get_releases -from listenbrainz_spark.stats.user.release_group import get_release_groups logger = logging.getLogger(__name__) -entity_handler_map = { - "artists": get_artists, - "releases": get_releases, - "recordings": get_recordings, - "release_groups": get_release_groups, -} - -entity_model_map = { - "artists": ArtistRecord, - "releases": ReleaseRecord, - "recordings": RecordingRecord, - "release_groups": ReleaseGroupRecord, -} - -entity_cache_map = { - "artists": [ARTIST_COUNTRY_CODE_DATAFRAME], - "releases": [RELEASE_METADATA_CACHE_DATAFRAME], - "recordings": [RECORDING_ARTIST_DATAFRAME, RELEASE_METADATA_CACHE_DATAFRAME], - "release_groups": [RELEASE_METADATA_CACHE_DATAFRAME, RELEASE_GROUP_METADATA_CACHE_DATAFRAME] -} - incremental_entity_map = { "artists": ArtistUserEntity, "releases": ReleaseUserEntity, @@ -56,7 +19,7 @@ NUMBER_OF_YIM_ENTITIES = 50 # number of top entities to retain for Year in Music stats -def get_entity_stats(entity: str, stats_range: str, message_type: str = "user_entity", database: str = None)\ +def get_entity_stats(entity: str, stats_range: str, message_type: str = "user_entity", database: str = None) \ -> Iterator[Optional[Dict]]: """ Get the top entity for all users for specified stats_range """ logger.debug(f"Calculating user_{entity}_{stats_range}...") @@ -67,89 +30,5 @@ def get_entity_stats(entity: str, stats_range: str, message_type: str = "user_en number_of_results = NUMBER_OF_TOP_ENTITIES entity_cls = incremental_entity_map[entity] - entity_obj = entity_cls(stats_range) - from_date, to_date, only_inc_users, data = entity_obj.generate_stats(number_of_results) - return create_messages(only_inc_users, data=data, entity=entity, stats_range=stats_range, from_date=from_date, - to_date=to_date, message_type=message_type, database=database) - - -def parse_one_user_stats(entry, entity: str, stats_range: str): - _dict = entry.asDict(recursive=True) - count_key = entity + "_count" - total_entity_count = _dict[count_key] - - entity_list = [] - for item in _dict[entity]: - try: - entity_model_map[entity](**item) - entity_list.append(item) - except ValidationError: - logger.error("Invalid entry in entity stats", exc_info=True) - total_entity_count -= 1 - - try: - return { - "user_id": _dict["user_id"], - "data": entity_list, - "count": total_entity_count - } - except ValidationError: - logger.error(f"""ValidationError while calculating {stats_range} top {entity} for user: - {_dict["user_id"]}. Data: {json.dumps(_dict, indent=3)}""", exc_info=True) - return None - - -def create_messages(only_inc_users, data, entity: str, stats_range: str, from_date: datetime, to_date: datetime, - message_type: str, database: str = None) \ - -> Iterator[Optional[Dict]]: - """ - Create messages to send the data to the webserver via RabbitMQ - - Args: - only_inc_users: whether stats were generated only for users with listens present in incremental dumps - data: Data to sent to the webserver - entity: The entity for which statistics are calculated, i.e 'artists', - 'releases' or 'recordings' - stats_range: The range for which the statistics have been calculated - from_date: The start time of the stats - to_date: The end time of the stats - message_type: used to decide which handler on LB webserver side should - handle this message. can be "user_entity" or "year_in_music_top_stats" - database: the name of the database in which the webserver should store the data - - Returns: - messages: A list of messages to be sent via RabbitMQ - """ - if database is None: - database = f"{entity}_{stats_range}_{date.today().strftime('%Y%m%d')}" - - if not only_inc_users: - yield { - "type": "couchdb_data_start", - "database": database - } - - from_ts = int(from_date.timestamp()) - to_ts = int(to_date.timestamp()) - - for entries in chunked(data, USERS_PER_MESSAGE): - multiple_user_stats = [] - for entry in entries: - processed_stat = parse_one_user_stats(entry, entity, stats_range) - multiple_user_stats.append(processed_stat) - - yield { - "type": message_type, - "stats_range": stats_range, - "from_ts": from_ts, - "to_ts": to_ts, - "entity": entity, - "data": multiple_user_stats, - "database": database - } - - if not only_inc_users: - yield { - "type": "couchdb_data_end", - "database": database - } + entity_obj = entity_cls(stats_range, database, message_type) + return entity_obj.main(number_of_results) diff --git a/listenbrainz_spark/stats/user/listening_activity.py b/listenbrainz_spark/stats/user/listening_activity.py index b95d82820e..bd3c508143 100644 --- a/listenbrainz_spark/stats/user/listening_activity.py +++ b/listenbrainz_spark/stats/user/listening_activity.py @@ -1,6 +1,6 @@ import json import logging -from datetime import datetime +from datetime import datetime, date from typing import Iterator, Optional, Dict from more_itertools import chunked @@ -10,54 +10,13 @@ from data.model.user_listening_activity import ListeningActivityRecord from listenbrainz_spark.stats import run_query from listenbrainz_spark.stats.common.listening_activity import setup_time_range +from listenbrainz_spark.stats.incremental.user.listening_activity import ListeningActivityUserEntity from listenbrainz_spark.stats.user import USERS_PER_MESSAGE from listenbrainz_spark.utils import get_listens_from_dump logger = logging.getLogger(__name__) - -def calculate_listening_activity(): - """ Calculate number of listens for each user in time ranges given in the "time_range" table. - The time ranges are as follows: - 1) week - each day with weekday name of the past 2 weeks - 2) month - each day the past 2 months - 3) quarter - each week of past 2 quarters - 4) half_yearly - each month of past 2 half-years - 5) year - each month of the past 2 years - 4) all_time - each year starting from LAST_FM_FOUNDING_YEAR (2002) - """ - # calculates the number of listens in each time range for each user, count(listen.listened_at) so that - # group without listens are counted as 0, count(*) gives 1. - result = run_query(""" - WITH dist_user_id AS ( - SELECT DISTINCT user_id FROM listens - ), intermediate_table AS ( - SELECT dist_user_id.user_id AS user_id - , to_unix_timestamp(first(time_range.start)) as from_ts - , to_unix_timestamp(first(time_range.end)) as to_ts - , time_range.time_range AS time_range - , count(listens.listened_at) as listen_count - FROM dist_user_id - CROSS JOIN time_range - LEFT JOIN listens - ON listens.listened_at BETWEEN time_range.start AND time_range.end - AND listens.user_id = dist_user_id.user_id - GROUP BY dist_user_id.user_id - , time_range.time_range - ) - SELECT user_id - , sort_array( - collect_list( - struct(from_ts, to_ts, time_range, listen_count) - ) - ) AS listening_activity - FROM intermediate_table - GROUP BY user_id - """) - return result.toLocalIterator() - - def get_listening_activity(stats_range: str, message_type="user_listening_activity", database: str = None)\ -> Iterator[Optional[Dict]]: """ Compute the number of listens for a time range compared to the previous range @@ -69,70 +28,5 @@ def get_listening_activity(stats_range: str, message_type="user_listening_activi details). These values are used on the listening activity reports. """ logger.debug(f"Calculating listening_activity_{stats_range}") - from_date, to_date, _, _, _ = setup_time_range(stats_range) - get_listens_from_dump(from_date, to_date).createOrReplaceTempView("listens") - data = calculate_listening_activity() - messages = create_messages(data=data, stats_range=stats_range, - from_date=from_date, to_date=to_date, - message_type=message_type, database=database) - logger.debug("Done!") - return messages - - -def create_messages(data, stats_range: str, from_date: datetime, to_date: datetime, - message_type: str, database: str = None) -> Iterator[Optional[Dict]]: - """ - Create messages to send the data to webserver via RabbitMQ - - Args: - data: Data to send to webserver - stats_range: The range for which the statistics have been calculated - from_date: The start time of the stats - to_date: The end time of the stats - message_type: used to decide which handler on LB webserver side should - handle this message. can be "user_entity" or "year_in_music_listens_per_day" - database: the name of the database in which the webserver should store the data - Returns: - messages: A list of messages to be sent via RabbitMQ - """ - if database is None: - database = f"listening_activity_{stats_range}" - - yield { - "type": "couchdb_data_start", - "database": database - } - - from_ts = int(from_date.timestamp()) - to_ts = int(to_date.timestamp()) - - for entries in chunked(data, USERS_PER_MESSAGE): - multiple_user_stats = [] - for entry in entries: - _dict = entry.asDict(recursive=True) - try: - UserStatRecords[ListeningActivityRecord]( - user_id=_dict["user_id"], - data=_dict["listening_activity"] - ) - multiple_user_stats.append({ - "user_id": _dict["user_id"], - "data": _dict["listening_activity"] - }) - except ValidationError: - logger.error(f"""ValidationError while calculating {stats_range} listening_activity for user: - {_dict["user_id"]}. Data: {json.dumps(_dict, indent=3)}""", exc_info=True) - - yield { - "type": message_type, - "stats_range": stats_range, - "from_ts": from_ts, - "to_ts": to_ts, - "data": multiple_user_stats, - "database": database - } - - yield { - "type": "couchdb_data_end", - "database": database - } + entity_obj = ListeningActivityUserEntity(stats_range, database, message_type) + return entity_obj.main(0) From 090e25fa1fa2b855fc84ecac3a835ff139a5f958 Mon Sep 17 00:00:00 2001 From: Kartik Ohri Date: Fri, 10 Jan 2025 01:06:01 +0530 Subject: [PATCH 14/20] fix imports --- listenbrainz_spark/stats/user/listening_activity.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/listenbrainz_spark/stats/user/listening_activity.py b/listenbrainz_spark/stats/user/listening_activity.py index bd3c508143..fbb9153096 100644 --- a/listenbrainz_spark/stats/user/listening_activity.py +++ b/listenbrainz_spark/stats/user/listening_activity.py @@ -1,19 +1,7 @@ -import json import logging -from datetime import datetime, date from typing import Iterator, Optional, Dict -from more_itertools import chunked -from pydantic import ValidationError - -from data.model.common_stat_spark import UserStatRecords, StatMessage -from data.model.user_listening_activity import ListeningActivityRecord -from listenbrainz_spark.stats import run_query -from listenbrainz_spark.stats.common.listening_activity import setup_time_range from listenbrainz_spark.stats.incremental.user.listening_activity import ListeningActivityUserEntity -from listenbrainz_spark.stats.user import USERS_PER_MESSAGE -from listenbrainz_spark.utils import get_listens_from_dump - logger = logging.getLogger(__name__) From c512ffb35ea8711356777cbc0c7a734184465b97 Mon Sep 17 00:00:00 2001 From: Kartik Ohri Date: Fri, 10 Jan 2025 11:43:35 +0530 Subject: [PATCH 15/20] Refactor incremental stats classes for YIM support --- .../stats/common/listening_activity.py | 18 ++++++++++--- .../stats/incremental/__init__.py | 17 +++++++++--- .../stats/incremental/user/artist.py | 5 ++-- .../stats/incremental/user/daily_activity.py | 7 +++-- .../stats/incremental/user/entity.py | 7 ++--- .../incremental/user/listening_activity.py | 10 ++++--- .../stats/incremental/user/recording.py | 5 ++-- .../stats/incremental/user/release.py | 5 ++-- .../stats/incremental/user/release_group.py | 5 ++-- .../stats/user/daily_activity.py | 2 +- listenbrainz_spark/stats/user/entity.py | 14 +++------- .../stats/user/listening_activity.py | 4 +-- .../year_in_music/listens_per_day.py | 27 +++++-------------- listenbrainz_spark/year_in_music/top_stats.py | 19 ++++++------- 14 files changed, 78 insertions(+), 67 deletions(-) diff --git a/listenbrainz_spark/stats/common/listening_activity.py b/listenbrainz_spark/stats/common/listening_activity.py index cb6720ba89..55a117d759 100644 --- a/listenbrainz_spark/stats/common/listening_activity.py +++ b/listenbrainz_spark/stats/common/listening_activity.py @@ -55,7 +55,7 @@ def get_two_quarters_ago_offset(_date: date) -> relativedelta: return relativedelta(month=4, day=1) -def _get_time_range_bounds(stats_range: str) -> Tuple[datetime, datetime, relativedelta, str, str]: +def _get_time_range_bounds(stats_range: str, year: int = None) -> Tuple[datetime, datetime, relativedelta, str, str]: """ Returns the start time, end time, segment step size, python date format and spark date format to use for calculating the listening activity stats @@ -65,6 +65,8 @@ def _get_time_range_bounds(stats_range: str) -> Tuple[datetime, datetime, relati Python date format reference: https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes Spark date format reference: https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html + If stats_range is set to year_in_music then the year must also be provided. + .. note:: other stats uses a different function (get_dates_for_stats_range) to calculate @@ -72,6 +74,16 @@ def _get_time_range_bounds(stats_range: str) -> Tuple[datetime, datetime, relati """ latest_listen_ts = get_latest_listen_ts() + if stats_range == "year_in_music": + if year is None: + raise ValueError("year is required when stats_range is set to year_in_music") + from_date = datetime(year, 1, 1) + to_date = datetime.combine(date(year, 12, 31), time.max) + step = relativedelta(days=+1) + date_format = "%d %B %Y" + spark_date_format = "dd MMMM y" + return from_date, to_date, step, date_format, spark_date_format + if stats_range == "all_time": # all_time stats range is easy, just return time from LASTFM founding # to the latest listen we have in spark @@ -190,7 +202,7 @@ def _create_time_range_df(from_date, to_date, step, date_format, spark_date_form time_range_df.createOrReplaceTempView("time_range") -def setup_time_range(stats_range: str) -> Tuple[datetime, datetime, relativedelta, str, str]: +def setup_time_range(stats_range: str, year: int = None) -> Tuple[datetime, datetime, relativedelta, str, str]: """ Sets up time range buckets needed to calculate listening activity stats and returns the start and end time of the time range. @@ -203,6 +215,6 @@ def setup_time_range(stats_range: str) -> Tuple[datetime, datetime, relativedelt will return 1st of last year as the start time and the current date as the end time in this example. """ - from_date, to_date, step, date_format, spark_date_format = _get_time_range_bounds(stats_range) + from_date, to_date, step, date_format, spark_date_format = _get_time_range_bounds(stats_range, year) _create_time_range_df(from_date, to_date, step, date_format, spark_date_format) return from_date, to_date, step, date_format, spark_date_format diff --git a/listenbrainz_spark/stats/incremental/__init__.py b/listenbrainz_spark/stats/incremental/__init__.py index 21d4651988..5638b5e97e 100644 --- a/listenbrainz_spark/stats/incremental/__init__.py +++ b/listenbrainz_spark/stats/incremental/__init__.py @@ -42,15 +42,23 @@ class IncrementalStats(abc.ABC): is absent in incremental listens. """ - def __init__(self, entity: str, stats_range: str): + def __init__(self, entity: str, stats_range: str = None, from_date: datetime = None, to_date: datetime = None): """ Args: entity: The entity for which statistics are generated. stats_range: The statistics range to calculate the stats for. + from_date: date from which listens to use for this stat + to_date: date until which listens to use for this stat + + If both from_date and to_date are specified, they will be used instead of stats_range. """ self.entity = entity - self.stats_range = stats_range - self.from_date, self.to_date = get_dates_for_stats_range(stats_range) + if from_date and to_date: + self.stats_range = f"{self.from_date.strftime('%Y%m%d')}_{self.to_date.strftime('%Y%m%d')}" + self.from_date, self.to_date = from_date, to_date + else: + self.stats_range = stats_range + self.from_date, self.to_date = get_dates_for_stats_range(stats_range) self._cache_tables = [] @abc.abstractmethod @@ -145,7 +153,8 @@ def partial_aggregate_usable(self) -> bool: .json(f"{HDFS_CLUSTER_URI}{metadata_path}") \ .collect()[0] existing_from_date, existing_to_date = metadata["from_date"], metadata["to_date"] - existing_aggregate_fresh = existing_from_date.date() == self.from_date.date() + existing_aggregate_fresh = existing_from_date.date() == self.from_date.date() \ + and existing_to_date.date() <= self.to_date.date() except AnalysisException: existing_aggregate_fresh = False diff --git a/listenbrainz_spark/stats/incremental/user/artist.py b/listenbrainz_spark/stats/incremental/user/artist.py index bf3f69694e..692c28c076 100644 --- a/listenbrainz_spark/stats/incremental/user/artist.py +++ b/listenbrainz_spark/stats/incremental/user/artist.py @@ -9,8 +9,9 @@ class ArtistUserEntity(UserEntity): - def __init__(self, stats_range, database, message_type): - super().__init__(entity="artists", stats_range=stats_range, database=database, message_type=message_type) + def __init__(self, stats_range, database, message_type, from_date=None, to_date=None): + super().__init__(entity="artists", stats_range=stats_range, database=database, message_type=message_type, + from_date=from_date, to_date=to_date) def get_cache_tables(self) -> List[str]: return [ARTIST_COUNTRY_CODE_DATAFRAME] diff --git a/listenbrainz_spark/stats/incremental/user/daily_activity.py b/listenbrainz_spark/stats/incremental/user/daily_activity.py index 5f3f1eb2cf..5db03e53e6 100644 --- a/listenbrainz_spark/stats/incremental/user/daily_activity.py +++ b/listenbrainz_spark/stats/incremental/user/daily_activity.py @@ -18,8 +18,11 @@ class DailyActivityUserEntity(UserEntity): - def __init__(self, stats_range, database, message_type): - super().__init__(entity="daily_activity", stats_range=stats_range, database=database, message_type=message_type) + def __init__(self, stats_range, database, message_type, from_date=None, to_date=None): + super().__init__( + entity="daily_activity", stats_range=stats_range, database=database, + message_type=message_type, from_date=from_date, to_date=to_date + ) self.setup_time_range() def setup_time_range(self): diff --git a/listenbrainz_spark/stats/incremental/user/entity.py b/listenbrainz_spark/stats/incremental/user/entity.py index 1f9211dd4b..be29ca7702 100644 --- a/listenbrainz_spark/stats/incremental/user/entity.py +++ b/listenbrainz_spark/stats/incremental/user/entity.py @@ -1,7 +1,7 @@ import abc import json import logging -from datetime import date +from datetime import date, datetime from typing import Optional, Iterator, Dict, Tuple from more_itertools import chunked @@ -31,8 +31,9 @@ class UserEntity(IncrementalStats, abc.ABC): - def __init__(self, entity: str, stats_range: str, database: Optional[str], message_type: Optional[str]): - super().__init__(entity, stats_range) + def __init__(self, entity: str, stats_range: str = None, database: str = None, message_type: str = None, + from_date: datetime = None, to_date: datetime = None): + super().__init__(entity, stats_range, from_date, to_date) if database: self.database = database else: diff --git a/listenbrainz_spark/stats/incremental/user/listening_activity.py b/listenbrainz_spark/stats/incremental/user/listening_activity.py index 68768b1f12..2a923b5278 100644 --- a/listenbrainz_spark/stats/incremental/user/listening_activity.py +++ b/listenbrainz_spark/stats/incremental/user/listening_activity.py @@ -15,9 +15,13 @@ class ListeningActivityUserEntity(UserEntity): - def __init__(self, stats_range, database, message_type): - super().__init__(entity="listening_activity", stats_range=stats_range, database=database, message_type=message_type) - self.from_date, self.to_date, _, __, self.spark_date_format = setup_time_range(stats_range) + def __init__(self, stats_range, database, message_type, year=None): + super().__init__( + entity="listening_activity", stats_range=stats_range, + database=database, message_type=message_type + ) + self.year = year + self.from_date, self.to_date, _, __, self.spark_date_format = setup_time_range(self.stats_range, self.year) def get_cache_tables(self) -> List[str]: return [] diff --git a/listenbrainz_spark/stats/incremental/user/recording.py b/listenbrainz_spark/stats/incremental/user/recording.py index 994bd5b4a9..869317d853 100644 --- a/listenbrainz_spark/stats/incremental/user/recording.py +++ b/listenbrainz_spark/stats/incremental/user/recording.py @@ -11,8 +11,9 @@ class RecordingUserEntity(UserEntity): - def __init__(self, stats_range, database, message_type): - super().__init__(entity="recordings", stats_range=stats_range, database=database, message_type=message_type) + def __init__(self, stats_range, database, message_type, from_date=None, to_date=None): + super().__init__(entity="recordings", stats_range=stats_range, database=database, message_type=message_type, + from_date=from_date, to_date=to_date) def get_cache_tables(self) -> List[str]: return [RECORDING_ARTIST_DATAFRAME, RELEASE_METADATA_CACHE_DATAFRAME] diff --git a/listenbrainz_spark/stats/incremental/user/release.py b/listenbrainz_spark/stats/incremental/user/release.py index 6119c474cf..61f1ea1dbb 100644 --- a/listenbrainz_spark/stats/incremental/user/release.py +++ b/listenbrainz_spark/stats/incremental/user/release.py @@ -10,8 +10,9 @@ class ReleaseUserEntity(UserEntity): - def __init__(self, stats_range, database, message_type): - super().__init__(entity="releases", stats_range=stats_range, database=database, message_type=message_type) + def __init__(self, stats_range, database, message_type, from_date, to_date): + super().__init__(entity="releases", stats_range=stats_range, database=database, message_type=message_type, + from_date=from_date, to_date=to_date) def get_cache_tables(self) -> List[str]: return [RELEASE_METADATA_CACHE_DATAFRAME] diff --git a/listenbrainz_spark/stats/incremental/user/release_group.py b/listenbrainz_spark/stats/incremental/user/release_group.py index 535bffcc69..ab5b0288a5 100644 --- a/listenbrainz_spark/stats/incremental/user/release_group.py +++ b/listenbrainz_spark/stats/incremental/user/release_group.py @@ -11,8 +11,9 @@ class ReleaseGroupUserEntity(UserEntity): - def __init__(self, stats_range, database, message_type): - super().__init__(entity="release_groups", stats_range=stats_range, database=database, message_type=message_type) + def __init__(self, stats_range, database, message_type, from_date=None, to_date=None): + super().__init__(entity="release_groups", stats_range=stats_range, database=database, message_type=message_type, + from_date=from_date, to_date=to_date) def get_cache_tables(self) -> List[str]: return [RELEASE_METADATA_CACHE_DATAFRAME, RELEASE_GROUP_METADATA_CACHE_DATAFRAME] diff --git a/listenbrainz_spark/stats/user/daily_activity.py b/listenbrainz_spark/stats/user/daily_activity.py index 77576f4086..9704dbb585 100644 --- a/listenbrainz_spark/stats/user/daily_activity.py +++ b/listenbrainz_spark/stats/user/daily_activity.py @@ -10,5 +10,5 @@ def get_daily_activity(stats_range: str, database: str = None) -> Iterator[Optional[Dict]]: """ Calculate number of listens for an user for the specified time range """ logger.debug(f"Calculating daily_activity_{stats_range}") - entity_obj = DailyActivityUserEntity(stats_range, database, message_type="user_daily_activity") + entity_obj = DailyActivityUserEntity(stats_range, database, "user_daily_activity") return entity_obj.main(0) diff --git a/listenbrainz_spark/stats/user/entity.py b/listenbrainz_spark/stats/user/entity.py index 0eaf680886..2b0de2ff67 100644 --- a/listenbrainz_spark/stats/user/entity.py +++ b/listenbrainz_spark/stats/user/entity.py @@ -16,19 +16,11 @@ } NUMBER_OF_TOP_ENTITIES = 1000 # number of top entities to retain for user stats -NUMBER_OF_YIM_ENTITIES = 50 # number of top entities to retain for Year in Music stats -def get_entity_stats(entity: str, stats_range: str, message_type: str = "user_entity", database: str = None) \ - -> Iterator[Optional[Dict]]: +def get_entity_stats(entity: str, stats_range: str, database: str = None) -> Iterator[Optional[Dict]]: """ Get the top entity for all users for specified stats_range """ logger.debug(f"Calculating user_{entity}_{stats_range}...") - - if message_type == "year_in_music_top_stats": - number_of_results = NUMBER_OF_YIM_ENTITIES - else: - number_of_results = NUMBER_OF_TOP_ENTITIES - entity_cls = incremental_entity_map[entity] - entity_obj = entity_cls(stats_range, database, message_type) - return entity_obj.main(number_of_results) + entity_obj = entity_cls(stats_range, database, "user_entity") + return entity_obj.main(NUMBER_OF_TOP_ENTITIES) diff --git a/listenbrainz_spark/stats/user/listening_activity.py b/listenbrainz_spark/stats/user/listening_activity.py index fbb9153096..224756bf51 100644 --- a/listenbrainz_spark/stats/user/listening_activity.py +++ b/listenbrainz_spark/stats/user/listening_activity.py @@ -5,7 +5,7 @@ logger = logging.getLogger(__name__) -def get_listening_activity(stats_range: str, message_type="user_listening_activity", database: str = None)\ +def get_listening_activity(stats_range: str, database: str = None)\ -> Iterator[Optional[Dict]]: """ Compute the number of listens for a time range compared to the previous range @@ -16,5 +16,5 @@ def get_listening_activity(stats_range: str, message_type="user_listening_activi details). These values are used on the listening activity reports. """ logger.debug(f"Calculating listening_activity_{stats_range}") - entity_obj = ListeningActivityUserEntity(stats_range, database, message_type) + entity_obj = ListeningActivityUserEntity(stats_range, database, "user_listening_activity") return entity_obj.main(0) diff --git a/listenbrainz_spark/year_in_music/listens_per_day.py b/listenbrainz_spark/year_in_music/listens_per_day.py index 04fc76599e..8c6bcc48d9 100644 --- a/listenbrainz_spark/year_in_music/listens_per_day.py +++ b/listenbrainz_spark/year_in_music/listens_per_day.py @@ -1,27 +1,12 @@ -from datetime import datetime, date, time - -from dateutil.relativedelta import relativedelta - -from listenbrainz_spark.stats.common.listening_activity import _create_time_range_df -from listenbrainz_spark.stats.user.listening_activity import calculate_listening_activity, create_messages -from listenbrainz_spark.utils import get_listens_from_dump +from listenbrainz_spark.stats.incremental.user.listening_activity import ListeningActivityUserEntity def calculate_listens_per_day(year): - from_date = datetime(year, 1, 1) - to_date = datetime.combine(date(year, 12, 31), time.max) - step = relativedelta(days=+1) - date_format = "%d %B %Y" - spark_date_format = "dd MMMM y" - - _create_time_range_df(from_date, to_date, step, date_format, spark_date_format) - listens = get_listens_from_dump(from_date, to_date) - listens.createOrReplaceTempView("listens") - - data = calculate_listening_activity() - stats = create_messages(data=data, stats_range="year_in_music", from_date=from_date, - to_date=to_date, message_type="year_in_music_listens_per_day") - for message in stats: + entity_obj = ListeningActivityUserEntity( + stats_range="year_in_music", database=None, message_type="year_in_music_listens_per_day", + year=year + ) + for message in entity_obj.main(0): # yim stats are stored in postgres instead of couchdb so drop those messages for yim if message["type"] == "couchdb_data_start" or message["type"] == "couchdb_data_end": continue diff --git a/listenbrainz_spark/year_in_music/top_stats.py b/listenbrainz_spark/year_in_music/top_stats.py index 711aca865c..fc8a59ca0f 100644 --- a/listenbrainz_spark/year_in_music/top_stats.py +++ b/listenbrainz_spark/year_in_music/top_stats.py @@ -1,21 +1,22 @@ from datetime import datetime, date, time -from listenbrainz_spark.stats.user.entity import get_entity_stats_for_range +from listenbrainz_spark.stats.incremental.user.artist import ArtistUserEntity +from listenbrainz_spark.stats.incremental.user.recording import RecordingUserEntity +from listenbrainz_spark.stats.incremental.user.release_group import ReleaseGroupUserEntity + +NUMBER_OF_YIM_ENTITIES = 50 def calculate_top_entity_stats(year): from_date = datetime(year, 1, 1) to_date = datetime.combine(date(year, 12, 31), time.max) - for entity in ["artists", "recordings", "release_groups"]: - stats = get_entity_stats_for_range( - entity, - "this_year", - from_date, - to_date, - "year_in_music_top_stats" + for entity_cls in [ArtistUserEntity, RecordingUserEntity, ReleaseGroupUserEntity]: + entity_obj = entity_cls( + stats_range=None, database=None, from_date=from_date, to_date=to_date, + message_type="year_in_music_top_stats" ) - for message in stats: + for message in entity_obj.main(50): # yim stats are stored in postgres instead of couchdb so drop those messages for yim if message["type"] == "couchdb_data_start" or message["type"] == "couchdb_data_end": continue From 250a076fb4a0a215be3585aacdfff4b5e51270e3 Mon Sep 17 00:00:00 2001 From: Kartik Ohri Date: Fri, 10 Jan 2025 13:33:33 +0530 Subject: [PATCH 16/20] fix ReleaseUserEntity __init__ --- listenbrainz_spark/stats/incremental/user/release.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/listenbrainz_spark/stats/incremental/user/release.py b/listenbrainz_spark/stats/incremental/user/release.py index 61f1ea1dbb..52ed0ad5a5 100644 --- a/listenbrainz_spark/stats/incremental/user/release.py +++ b/listenbrainz_spark/stats/incremental/user/release.py @@ -10,7 +10,7 @@ class ReleaseUserEntity(UserEntity): - def __init__(self, stats_range, database, message_type, from_date, to_date): + def __init__(self, stats_range, database, message_type, from_date=None, to_date=None): super().__init__(entity="releases", stats_range=stats_range, database=database, message_type=message_type, from_date=from_date, to_date=to_date) From a55557bbcfef3dcc483b1bc99ac61fcb0e76ada1 Mon Sep 17 00:00:00 2001 From: Kartik Ohri Date: Fri, 10 Jan 2025 14:18:46 +0530 Subject: [PATCH 17/20] fix whitespace --- listenbrainz_spark/stats/incremental/user/artist.py | 4 ++-- listenbrainz_spark/stats/incremental/user/daily_activity.py | 2 +- listenbrainz_spark/stats/incremental/user/entity.py | 2 +- .../stats/incremental/user/listening_activity.py | 4 ++-- listenbrainz_spark/stats/incremental/user/recording.py | 4 ++-- listenbrainz_spark/stats/incremental/user/release.py | 4 ++-- listenbrainz_spark/stats/incremental/user/release_group.py | 4 ++-- 7 files changed, 12 insertions(+), 12 deletions(-) diff --git a/listenbrainz_spark/stats/incremental/user/artist.py b/listenbrainz_spark/stats/incremental/user/artist.py index 692c28c076..b4782c3bba 100644 --- a/listenbrainz_spark/stats/incremental/user/artist.py +++ b/listenbrainz_spark/stats/incremental/user/artist.py @@ -9,7 +9,7 @@ class ArtistUserEntity(UserEntity): - def __init__(self, stats_range, database, message_type, from_date=None, to_date=None): + def __init__(self, stats_range, database, message_type, from_date=None, to_date=None): super().__init__(entity="artists", stats_range=stats_range, database=database, message_type=message_type, from_date=from_date, to_date=to_date) @@ -70,7 +70,7 @@ def combine_aggregates(self, existing_aggregate, incremental_aggregate): , artist_name , artist_mbid , listen_count - FROM {incremental_aggregate} + FROM {incremental_aggregate} ) SELECT user_id , first(artist_name) AS artist_name diff --git a/listenbrainz_spark/stats/incremental/user/daily_activity.py b/listenbrainz_spark/stats/incremental/user/daily_activity.py index 5db03e53e6..0cb0301a63 100644 --- a/listenbrainz_spark/stats/incremental/user/daily_activity.py +++ b/listenbrainz_spark/stats/incremental/user/daily_activity.py @@ -18,7 +18,7 @@ class DailyActivityUserEntity(UserEntity): - def __init__(self, stats_range, database, message_type, from_date=None, to_date=None): + def __init__(self, stats_range, database, message_type, from_date=None, to_date=None): super().__init__( entity="daily_activity", stats_range=stats_range, database=database, message_type=message_type, from_date=from_date, to_date=to_date diff --git a/listenbrainz_spark/stats/incremental/user/entity.py b/listenbrainz_spark/stats/incremental/user/entity.py index be29ca7702..e60d432590 100644 --- a/listenbrainz_spark/stats/incremental/user/entity.py +++ b/listenbrainz_spark/stats/incremental/user/entity.py @@ -31,7 +31,7 @@ class UserEntity(IncrementalStats, abc.ABC): - def __init__(self, entity: str, stats_range: str = None, database: str = None, message_type: str = None, + def __init__(self, entity: str, stats_range: str = None, database: str = None, message_type: str = None, from_date: datetime = None, to_date: datetime = None): super().__init__(entity, stats_range, from_date, to_date) if database: diff --git a/listenbrainz_spark/stats/incremental/user/listening_activity.py b/listenbrainz_spark/stats/incremental/user/listening_activity.py index 2a923b5278..69264626d8 100644 --- a/listenbrainz_spark/stats/incremental/user/listening_activity.py +++ b/listenbrainz_spark/stats/incremental/user/listening_activity.py @@ -15,7 +15,7 @@ class ListeningActivityUserEntity(UserEntity): - def __init__(self, stats_range, database, message_type, year=None): + def __init__(self, stats_range, database, message_type, year=None): super().__init__( entity="listening_activity", stats_range=stats_range, database=database, message_type=message_type @@ -82,7 +82,7 @@ def get_top_n(self, final_aggregate, N): FROM time_range LEFT JOIN {final_aggregate} USING (time_range) - GROUP BY user_id + GROUP BY user_id """ return run_query(query) diff --git a/listenbrainz_spark/stats/incremental/user/recording.py b/listenbrainz_spark/stats/incremental/user/recording.py index 869317d853..5fee4f49a1 100644 --- a/listenbrainz_spark/stats/incremental/user/recording.py +++ b/listenbrainz_spark/stats/incremental/user/recording.py @@ -11,7 +11,7 @@ class RecordingUserEntity(UserEntity): - def __init__(self, stats_range, database, message_type, from_date=None, to_date=None): + def __init__(self, stats_range, database, message_type, from_date=None, to_date=None): super().__init__(entity="recordings", stats_range=stats_range, database=database, message_type=message_type, from_date=from_date, to_date=to_date) @@ -93,7 +93,7 @@ def combine_aggregates(self, existing_aggregate, incremental_aggregate): , caa_id , caa_release_mbid , listen_count - FROM {incremental_aggregate} + FROM {incremental_aggregate} ) SELECT user_id , first(recording_name) AS recording_name diff --git a/listenbrainz_spark/stats/incremental/user/release.py b/listenbrainz_spark/stats/incremental/user/release.py index 52ed0ad5a5..994e719b1e 100644 --- a/listenbrainz_spark/stats/incremental/user/release.py +++ b/listenbrainz_spark/stats/incremental/user/release.py @@ -10,7 +10,7 @@ class ReleaseUserEntity(UserEntity): - def __init__(self, stats_range, database, message_type, from_date=None, to_date=None): + def __init__(self, stats_range, database, message_type, from_date=None, to_date=None): super().__init__(entity="releases", stats_range=stats_range, database=database, message_type=message_type, from_date=from_date, to_date=to_date) @@ -92,7 +92,7 @@ def combine_aggregates(self, existing_aggregate, incremental_aggregate): , caa_id , caa_release_mbid , listen_count - FROM {incremental_aggregate} + FROM {incremental_aggregate} ) SELECT user_id , first(release_name) AS release_name diff --git a/listenbrainz_spark/stats/incremental/user/release_group.py b/listenbrainz_spark/stats/incremental/user/release_group.py index ab5b0288a5..e0fb723503 100644 --- a/listenbrainz_spark/stats/incremental/user/release_group.py +++ b/listenbrainz_spark/stats/incremental/user/release_group.py @@ -11,7 +11,7 @@ class ReleaseGroupUserEntity(UserEntity): - def __init__(self, stats_range, database, message_type, from_date=None, to_date=None): + def __init__(self, stats_range, database, message_type, from_date=None, to_date=None): super().__init__(entity="release_groups", stats_range=stats_range, database=database, message_type=message_type, from_date=from_date, to_date=to_date) @@ -98,7 +98,7 @@ def combine_aggregates(self, existing_aggregate, incremental_aggregate): , caa_id , caa_release_mbid , listen_count - FROM {incremental_aggregate} + FROM {incremental_aggregate} ) SELECT user_id , first(release_group_name) AS release_group_name From a8506f0e0e67dacbb49affea1ecde6a7398fd639 Mon Sep 17 00:00:00 2001 From: Kartik Ohri Date: Fri, 10 Jan 2025 14:20:07 +0530 Subject: [PATCH 18/20] fix whitespace - 2 --- listenbrainz_spark/stats/incremental/user/entity.py | 2 +- listenbrainz_spark/stats/user/listening_activity.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/listenbrainz_spark/stats/incremental/user/entity.py b/listenbrainz_spark/stats/incremental/user/entity.py index e60d432590..3fee951fe2 100644 --- a/listenbrainz_spark/stats/incremental/user/entity.py +++ b/listenbrainz_spark/stats/incremental/user/entity.py @@ -32,7 +32,7 @@ class UserEntity(IncrementalStats, abc.ABC): def __init__(self, entity: str, stats_range: str = None, database: str = None, message_type: str = None, - from_date: datetime = None, to_date: datetime = None): + from_date: datetime = None, to_date: datetime = None): super().__init__(entity, stats_range, from_date, to_date) if database: self.database = database diff --git a/listenbrainz_spark/stats/user/listening_activity.py b/listenbrainz_spark/stats/user/listening_activity.py index 224756bf51..a97395625f 100644 --- a/listenbrainz_spark/stats/user/listening_activity.py +++ b/listenbrainz_spark/stats/user/listening_activity.py @@ -5,6 +5,7 @@ logger = logging.getLogger(__name__) + def get_listening_activity(stats_range: str, database: str = None)\ -> Iterator[Optional[Dict]]: """ Compute the number of listens for a time range compared to the previous range From da8255ac8eecdb8d1754ede97c6cb1f8c580a324 Mon Sep 17 00:00:00 2001 From: Kartik Ohri Date: Fri, 10 Jan 2025 15:29:28 +0530 Subject: [PATCH 19/20] fix ReleaseSitewideEntity --- listenbrainz_spark/stats/incremental/sitewide/release.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/listenbrainz_spark/stats/incremental/sitewide/release.py b/listenbrainz_spark/stats/incremental/sitewide/release.py index 32c8c622fa..5ed046118c 100644 --- a/listenbrainz_spark/stats/incremental/sitewide/release.py +++ b/listenbrainz_spark/stats/incremental/sitewide/release.py @@ -9,8 +9,8 @@ class ReleaseSitewideEntity(SitewideEntity): - def __init__(self): - super().__init__(entity="releases") + def __init__(self, stats_range): + super().__init__(entity="releases", stats_range=stats_range) def get_cache_tables(self) -> List[str]: return [RELEASE_METADATA_CACHE_DATAFRAME] From dc98164da52768d5624c6e1a509c7ae022566393 Mon Sep 17 00:00:00 2001 From: Kartik Ohri Date: Fri, 24 Jan 2025 17:53:31 +0530 Subject: [PATCH 20/20] Add documentation --- listenbrainz_spark/stats/incremental/__init__.py | 1 + listenbrainz_spark/stats/incremental/user/artist.py | 1 + listenbrainz_spark/stats/incremental/user/daily_activity.py | 3 ++- listenbrainz_spark/stats/incremental/user/entity.py | 4 ++++ .../stats/incremental/user/listening_activity.py | 1 + listenbrainz_spark/stats/incremental/user/recording.py | 1 + listenbrainz_spark/stats/incremental/user/release.py | 1 + listenbrainz_spark/stats/incremental/user/release_group.py | 1 + 8 files changed, 12 insertions(+), 1 deletion(-) diff --git a/listenbrainz_spark/stats/incremental/__init__.py b/listenbrainz_spark/stats/incremental/__init__.py index 5638b5e97e..e1602cc2e0 100644 --- a/listenbrainz_spark/stats/incremental/__init__.py +++ b/listenbrainz_spark/stats/incremental/__init__.py @@ -191,6 +191,7 @@ def create_partial_aggregate(self) -> DataFrame: return full_df def incremental_dump_exists(self) -> bool: + """ Check if incremental dump exists. """ return hdfs_connection.client.status(INCREMENTAL_DUMPS_SAVE_PATH, strict=False) def create_incremental_aggregate(self) -> DataFrame: diff --git a/listenbrainz_spark/stats/incremental/user/artist.py b/listenbrainz_spark/stats/incremental/user/artist.py index b4782c3bba..bd7f167f0d 100644 --- a/listenbrainz_spark/stats/incremental/user/artist.py +++ b/listenbrainz_spark/stats/incremental/user/artist.py @@ -8,6 +8,7 @@ class ArtistUserEntity(UserEntity): + """ See base class IncrementalStats for documentation. """ def __init__(self, stats_range, database, message_type, from_date=None, to_date=None): super().__init__(entity="artists", stats_range=stats_range, database=database, message_type=message_type, diff --git a/listenbrainz_spark/stats/incremental/user/daily_activity.py b/listenbrainz_spark/stats/incremental/user/daily_activity.py index 0cb0301a63..668f7af3a6 100644 --- a/listenbrainz_spark/stats/incremental/user/daily_activity.py +++ b/listenbrainz_spark/stats/incremental/user/daily_activity.py @@ -17,6 +17,7 @@ class DailyActivityUserEntity(UserEntity): + """ See base class IncrementalStats for documentation. """ def __init__(self, stats_range, database, message_type, from_date=None, to_date=None): super().__init__( @@ -26,7 +27,7 @@ def __init__(self, stats_range, database, message_type, from_date=None, to_date= self.setup_time_range() def setup_time_range(self): - # Genarate a dataframe containing hours of all days of the week + """ Genarate a dataframe containing hours of all days of the week. """ weekdays = [calendar.day_name[day] for day in range(0, 7)] hours = [hour for hour in range(0, 24)] time_range = itertools.product(weekdays, hours) diff --git a/listenbrainz_spark/stats/incremental/user/entity.py b/listenbrainz_spark/stats/incremental/user/entity.py index 3fee951fe2..a0b393477b 100644 --- a/listenbrainz_spark/stats/incremental/user/entity.py +++ b/listenbrainz_spark/stats/incremental/user/entity.py @@ -30,6 +30,7 @@ class UserEntity(IncrementalStats, abc.ABC): + """ See base class IncrementalStats for documentation. """ def __init__(self, entity: str, stats_range: str = None, database: str = None, message_type: str = None, from_date: datetime = None, to_date: datetime = None): @@ -47,6 +48,9 @@ def get_table_prefix(self) -> str: return f"user_{self.entity}_{self.stats_range}" def filter_existing_aggregate(self, existing_aggregate, incremental_aggregate): + """ Filter listens from existing aggregate to only include listens for entities having listens in the + incremental dumps. + """ query = f""" WITH incremental_users AS ( SELECT DISTINCT user_id FROM {incremental_aggregate} diff --git a/listenbrainz_spark/stats/incremental/user/listening_activity.py b/listenbrainz_spark/stats/incremental/user/listening_activity.py index 69264626d8..730ae5dbd3 100644 --- a/listenbrainz_spark/stats/incremental/user/listening_activity.py +++ b/listenbrainz_spark/stats/incremental/user/listening_activity.py @@ -14,6 +14,7 @@ class ListeningActivityUserEntity(UserEntity): + """ See base class IncrementalStats for documentation. """ def __init__(self, stats_range, database, message_type, year=None): super().__init__( diff --git a/listenbrainz_spark/stats/incremental/user/recording.py b/listenbrainz_spark/stats/incremental/user/recording.py index 5fee4f49a1..af87545450 100644 --- a/listenbrainz_spark/stats/incremental/user/recording.py +++ b/listenbrainz_spark/stats/incremental/user/recording.py @@ -10,6 +10,7 @@ class RecordingUserEntity(UserEntity): + """ See base class IncrementalStats for documentation. """ def __init__(self, stats_range, database, message_type, from_date=None, to_date=None): super().__init__(entity="recordings", stats_range=stats_range, database=database, message_type=message_type, diff --git a/listenbrainz_spark/stats/incremental/user/release.py b/listenbrainz_spark/stats/incremental/user/release.py index 994e719b1e..4e77d54e18 100644 --- a/listenbrainz_spark/stats/incremental/user/release.py +++ b/listenbrainz_spark/stats/incremental/user/release.py @@ -9,6 +9,7 @@ class ReleaseUserEntity(UserEntity): + """ See base class IncrementalStats for documentation. """ def __init__(self, stats_range, database, message_type, from_date=None, to_date=None): super().__init__(entity="releases", stats_range=stats_range, database=database, message_type=message_type, diff --git a/listenbrainz_spark/stats/incremental/user/release_group.py b/listenbrainz_spark/stats/incremental/user/release_group.py index e0fb723503..117d557421 100644 --- a/listenbrainz_spark/stats/incremental/user/release_group.py +++ b/listenbrainz_spark/stats/incremental/user/release_group.py @@ -10,6 +10,7 @@ class ReleaseGroupUserEntity(UserEntity): + """ See base class IncrementalStats for documentation. """ def __init__(self, stats_range, database, message_type, from_date=None, to_date=None): super().__init__(entity="release_groups", stats_range=stats_range, database=database, message_type=message_type,