Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental stats sitewide #3114

Open
wants to merge 26 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
169a3f6
interim checkin
amCap1712 Dec 30, 2024
006b367
fix table use
amCap1712 Dec 30, 2024
4805bd2
fix combined table
amCap1712 Dec 30, 2024
11c8042
fix partial df use
amCap1712 Dec 30, 2024
ca2e1fb
add per user limit for sitewide stats
amCap1712 Dec 30, 2024
bb62d95
testing more scenarios
amCap1712 Dec 31, 2024
d64f7ea
refactor incremental sitewide stats
amCap1712 Dec 31, 2024
570e3ce
fix import
amCap1712 Dec 31, 2024
6fae52c
add all time incremental stats for other entities
amCap1712 Dec 31, 2024
4e037c2
Delete partial sitewide aggregates on import of full dump
amCap1712 Jan 2, 2025
85ea8a4
Add bookkeeping for using aggregates of any stats_range
amCap1712 Jan 2, 2025
0d64068
fix imports
amCap1712 Jan 2, 2025
1fe0397
fix metadata path
amCap1712 Jan 2, 2025
da7fce5
add logging to debug
amCap1712 Jan 2, 2025
28551bd
fix existing agg usable check
amCap1712 Jan 2, 2025
6b97954
add schema to json read
amCap1712 Jan 2, 2025
b9fd965
fix skip_trash arg in dump upload
amCap1712 Jan 6, 2025
dd9ec00
Refactor SitewideEntity for sharing with other stats
amCap1712 Jan 7, 2025
1b9df3a
Fix constructors
amCap1712 Jan 7, 2025
f1af83c
Fix call to generate_stats
amCap1712 Jan 7, 2025
a9a62ce
Fix call to generate_stats - 2
amCap1712 Jan 7, 2025
aeb4385
Fix aggregates cleanup and remove outdated tests
amCap1712 Jan 8, 2025
53a72f9
add missing path
amCap1712 Jan 8, 2025
e60295f
make sitewide listening activity incremental
amCap1712 Jan 8, 2025
7dca0d0
simply generating stats if incremental dump doesn't exist
amCap1712 Jan 8, 2025
836236d
Refactor create messages and stats validation into class
amCap1712 Jan 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion listenbrainz_spark/hdfs/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
from typing import List

from listenbrainz_spark import schema, path, utils
from listenbrainz_spark import schema, path, utils, hdfs_connection
from listenbrainz_spark.hdfs.utils import create_dir
from listenbrainz_spark.hdfs.utils import delete_dir
from listenbrainz_spark.hdfs.utils import path_exists
Expand Down Expand Up @@ -180,3 +180,6 @@ def process_full_listens_dump(self):
.partitionBy("year", "month") \
.mode("overwrite") \
.parquet(path.LISTENBRAINZ_INTERMEDIATE_STATS_DIRECTORY)

if path_exists(path.LISTENBRAINZ_BASE_STATS_DIRECTORY):
hdfs_connection.client.delete(path.LISTENBRAINZ_BASE_STATS_DIRECTORY, recursive=True, skip_trash=True)
3 changes: 3 additions & 0 deletions listenbrainz_spark/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

LISTENBRAINZ_INTERMEDIATE_STATS_DIRECTORY = os.path.join('/', 'data', 'stats-new')

LISTENBRAINZ_BASE_STATS_DIRECTORY = os.path.join('/', 'stats')
LISTENBRAINZ_SITEWIDE_STATS_DIRECTORY = os.path.join(LISTENBRAINZ_BASE_STATS_DIRECTORY, 'sitewide')

# MLHD+ dump files
MLHD_PLUS_RAW_DATA_DIRECTORY = os.path.join("/", "mlhd-raw")
MLHD_PLUS_DATA_DIRECTORY = os.path.join("/", "mlhd") # processed MLHD+ dump data
Expand Down
5 changes: 5 additions & 0 deletions listenbrainz_spark/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
from pyspark.sql.types import StructField, StructType, ArrayType, StringType, TimestampType, FloatType, \
IntegerType, LongType

BOOKKEEPING_SCHEMA = StructType([
StructField('from_date', TimestampType(), nullable=False),
StructField('to_date', TimestampType(), nullable=False),
StructField('created', TimestampType(), nullable=False),
])

mlhd_schema = StructType([
StructField('user_id', StringType(), nullable=False),
Expand Down
197 changes: 197 additions & 0 deletions listenbrainz_spark/stats/incremental/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import abc
from datetime import datetime
from pathlib import Path
from typing import List

from pyspark.errors import AnalysisException
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType

import listenbrainz_spark
from listenbrainz_spark import hdfs_connection
from listenbrainz_spark.config import HDFS_CLUSTER_URI
from listenbrainz_spark.path import INCREMENTAL_DUMPS_SAVE_PATH
from listenbrainz_spark.schema import BOOKKEEPING_SCHEMA
from listenbrainz_spark.stats import get_dates_for_stats_range
from listenbrainz_spark.utils import read_files_from_HDFS, logger, get_listens_from_dump


class IncrementalStats(abc.ABC):
"""
Provides a framework for generating incremental statistics for a given entity (e.g., users, tracks)
over a specified date range.

In the ListenBrainz Spark cluster, full dump listens (which remain constant for ~15 days) and incremental listens
(ingested daily) are the two main sources of data. Incremental listens are cleared whenever a new full dump is
imported. Aggregating full dump listens daily for various statistics is inefficient since this data does not
change.

To optimize this process:

1. A partial aggregate is generated from the full dump listens the first time a stat is requested. This partial
aggregate is stored in HDFS for future use, eliminating the need for redundant full dump aggregation.
2. Incremental listens are aggregated daily. Although all incremental listens since the full dump’s import are
used (not just today’s), this introduces some redundant computation.
3. The incremental aggregate is combined with the existing partial aggregate, forming a combined aggregate from
which final statistics are generated.

For non-sitewide statistics, further optimization is possible:

If an entity’s listens (e.g., for a user) are not present in the incremental listens, its statistics do not
need to be recalculated. Similarly, entity-level listener stats can skip recomputation when relevant data
is absent in incremental listens.
"""

def __init__(self, entity: str, stats_range: str):
"""
Args:
entity: The entity for which statistics are generated.
stats_range: The statistics range to calculate the stats for.
"""
self.entity = entity
self.stats_range = stats_range
self.from_date, self.to_date = get_dates_for_stats_range(stats_range)
self._cache_tables = []

@abc.abstractmethod
def get_base_path(self) -> str:
""" Returns the base HDFS path for storing partial data and metadata for this category of statistics. """
raise NotImplementedError()

def get_existing_aggregate_path(self) -> str:
""" Returns the HDFS path for existing aggregate data. """
return f"{self.get_base_path()}/aggregates/{self.entity}/{self.stats_range}"

def get_bookkeeping_path(self) -> str:
""" Returns the HDFS path for bookkeeping metadata. """
return f"{self.get_base_path()}/bookkeeping/{self.entity}/{self.stats_range}"

@abc.abstractmethod
def get_partial_aggregate_schema(self) -> StructType:
""" Returns the spark schema of the partial aggregates created during generation of this stat. """
raise NotImplementedError()

@abc.abstractmethod
def aggregate(self, table: str, cache_tables: List[str]) -> DataFrame:
"""
Create partial aggregates from the given listens.

Args:
table: The listen table to aggregation.
cache_tables: List of metadata cache tables.

Returns:
DataFrame: The aggregated DataFrame.
"""
raise NotImplementedError()

@abc.abstractmethod
def combine_aggregates(self, existing_aggregate: str, incremental_aggregate: str) -> DataFrame:
"""
Combines existing aggregate and incremental aggregate to get the final aggregate to obtain stats from.

Args:
existing_aggregate: The table name for existing aggregate.
incremental_aggregate: The table name for incremental aggregate.

Returns:
DataFrame: The combined DataFrame.
"""
raise NotImplementedError()

@abc.abstractmethod
def get_top_n(self, final_aggregate: str, N: int) -> DataFrame:
"""
Obtain the top N entities for the given statistic from the final aggregate.

Args:
final_aggregate: The table name for the final aggregate.
N: The number of top entities to retrieve.

Returns:
DataFrame: The DataFrame containing the top N entities.
"""
raise NotImplementedError()

@abc.abstractmethod
def get_cache_tables(self) -> List[str]:
""" Returns the list of HDFS paths for the metadata cache tables required by the statistic. """
raise NotImplementedError()

def setup_cache_tables(self):
""" Set up metadata cache tables by reading data from HDFS and creating temporary views. """
cache_tables = []
for idx, df_path in enumerate(self.get_cache_tables()):
df_name = f"entity_data_cache_{idx}"
cache_tables.append(df_name)
read_files_from_HDFS(df_path).createOrReplaceTempView(df_name)
self._cache_tables = cache_tables

@abc.abstractmethod
def get_table_prefix(self) -> str:
""" Get the prefix for table names based on the stat type, entity and stats range. """
raise NotImplementedError()

def partial_aggregate_usable(self) -> bool:
""" Checks whether a partial aggregate exists and is fresh to generate the required stats. """
metadata_path = self.get_bookkeeping_path()
existing_aggregate_path = self.get_existing_aggregate_path()

try:
metadata = listenbrainz_spark \
.session \
.read \
.schema(BOOKKEEPING_SCHEMA) \
.json(f"{HDFS_CLUSTER_URI}{metadata_path}") \
.collect()[0]
existing_from_date, existing_to_date = metadata["from_date"], metadata["to_date"]
existing_aggregate_fresh = existing_from_date.date() == self.from_date.date()
except AnalysisException:
existing_aggregate_fresh = False

existing_aggregate_exists = hdfs_connection.client.status(existing_aggregate_path, strict=False)

return existing_aggregate_fresh and existing_aggregate_exists

def create_partial_aggregate(self) -> DataFrame:
"""
Create a new partial aggregate from full dump listens.

Returns:
DataFrame: The generated partial aggregate DataFrame.
"""
metadata_path = self.get_bookkeeping_path()
existing_aggregate_path = self.get_existing_aggregate_path()

table = f"{self.get_table_prefix()}_full_listens"
get_listens_from_dump(self.from_date, self.to_date, include_incremental=False).createOrReplaceTempView(table)

logger.info("Creating partial aggregate from full dump listens")
hdfs_connection.client.makedirs(Path(existing_aggregate_path).parent)
full_df = self.aggregate(table, self._cache_tables)
full_df.write.mode("overwrite").parquet(existing_aggregate_path)

hdfs_connection.client.makedirs(Path(metadata_path).parent)
metadata_df = listenbrainz_spark.session.createDataFrame(
[(self.from_date, self.to_date, datetime.now())],
schema=BOOKKEEPING_SCHEMA
)
metadata_df.write.mode("overwrite").json(metadata_path)
logger.info("Finished creating partial aggregate from full dump listens")

return full_df

def incremental_dump_exists(self) -> bool:
return hdfs_connection.client.status(INCREMENTAL_DUMPS_SAVE_PATH, strict=False)

def create_incremental_aggregate(self) -> DataFrame:
"""
Create an incremental aggregate from incremental listens.

Returns:
DataFrame: The generated incremental aggregate DataFrame.
"""
table = f"{self.get_table_prefix()}_incremental_listens"
read_files_from_HDFS(INCREMENTAL_DUMPS_SAVE_PATH) \
.createOrReplaceTempView(table)
return self.aggregate(table, self._cache_tables)
Empty file.
111 changes: 111 additions & 0 deletions listenbrainz_spark/stats/incremental/sitewide/artist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from typing import List

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

from listenbrainz_spark.path import ARTIST_COUNTRY_CODE_DATAFRAME
from listenbrainz_spark.stats import run_query
from listenbrainz_spark.stats.incremental.sitewide.entity import SitewideEntity


class AritstSitewideEntity(SitewideEntity):

def __init__(self, stats_range):
super().__init__(entity="artists", stats_range=stats_range)

def get_cache_tables(self) -> List[str]:
return [ARTIST_COUNTRY_CODE_DATAFRAME]

def get_partial_aggregate_schema(self):
return StructType([
StructField("artist_name", StringType(), nullable=False),
StructField("artist_mbid", StringType(), nullable=True),
StructField("listen_count", IntegerType(), nullable=False),
])

def aggregate(self, table, cache_tables):
user_listen_count_limit = self.get_listen_count_limit()
cache_table = cache_tables[0]
result = run_query(f"""
WITH exploded_listens AS (
SELECT user_id
, artist_name AS artist_credit_name
, explode_outer(artist_credit_mbids) AS artist_mbid
FROM {table}
), listens_with_mb_data as (
SELECT user_id
, COALESCE(at.artist_name, el.artist_credit_name) AS artist_name
, el.artist_mbid
FROM exploded_listens el
LEFT JOIN {cache_table} at
ON el.artist_mbid = at.artist_mbid
), user_counts as (
SELECT user_id
, first(artist_name) AS any_artist_name
, artist_mbid
, LEAST(count(*), {user_listen_count_limit}) as listen_count
FROM listens_with_mb_data
GROUP BY user_id
, lower(artist_name)
, artist_mbid
)
SELECT first(any_artist_name) AS artist_name
, artist_mbid
, SUM(listen_count) as listen_count
FROM user_counts
GROUP BY lower(any_artist_name)
, artist_mbid
""")
return result

def combine_aggregates(self, existing_aggregate, incremental_aggregate):
query = f"""
WITH intermediate_table AS (
SELECT artist_name
, artist_mbid
, listen_count
FROM {existing_aggregate}
UNION ALL
SELECT artist_name
, artist_mbid
, listen_count
FROM {incremental_aggregate}
)
SELECT first(artist_name) AS artist_name
, artist_mbid
, sum(listen_count) as listen_count
FROM intermediate_table
GROUP BY lower(artist_name)
, artist_mbid
"""
return run_query(query)

def get_top_n(self, final_aggregate, N):
query = f"""
WITH entity_count AS (
SELECT count(*) AS total_count
FROM {final_aggregate}
), ordered_stats AS (
SELECT *
FROM {final_aggregate}
ORDER BY listen_count DESC
LIMIT {N}
), grouped_stats AS (
SELECT sort_array(
collect_list(
struct(
listen_count
, artist_name
, artist_mbid
)
)
, false
) AS stats
FROM ordered_stats
)
SELECT total_count
, stats
FROM grouped_stats
JOIN entity_count
ON TRUE
"""
return run_query(query)
Loading
Loading