From 7b4578a79f172b2572701250a3cc9deedb8511e7 Mon Sep 17 00:00:00 2001 From: "Bradley A. Thornton" Date: Tue, 28 May 2024 14:45:14 -0700 Subject: [PATCH] Add multi stream support (#6) * Add multi stream * Remove XML support * Formatting * Add pls support * Delete kingfmva.pls --- tunein/__init__.py | 122 +++++++++++++++++++++-------------- tunein/subcommands/search.py | 57 ++++++++++++---- tunein/xml_helper.py | 67 ------------------- 3 files changed, 117 insertions(+), 129 deletions(-) delete mode 100644 tunein/xml_helper.py diff --git a/tunein/__init__.py b/tunein/__init__.py index 27776fd..1aa4928 100644 --- a/tunein/__init__.py +++ b/tunein/__init__.py @@ -1,5 +1,6 @@ +from urllib.parse import urlparse, urlunparse + import requests -from tunein.xml_helper import xml2dict from tunein.parse import fuzzy_match @@ -15,6 +16,14 @@ def title(self): def artist(self): return self.raw.get("artist", "") + @property + def bit_rate(self): + return self.raw.get("bitrate") + + @property + def media_type(self): + return self.raw.get("media_type") + @property def image(self): return self.raw.get("image") @@ -44,72 +53,87 @@ def dict(self): """Return a dict representation of the station.""" return { "artist": self.artist, + "bit_rate": self.bit_rate, "description": self.description, "image": self.image, "match": self.match(), + "media_type": self.media_type, "stream": self.stream, "title": self.title, } class TuneIn: - search_url = "http://opml.radiotime.com/Search.ashx" - featured_url = "http://opml.radiotime.com/Browse.ashx?c=local" # local stations + search_url = "https://opml.radiotime.com/Search.ashx" + featured_url = "http://opml.radiotime.com/Browse.ashx" # local stations + stnd_query = {"formats": "mp3,aac,ogg,html,hls", "render": "json"} @staticmethod - def get_stream_url(url): - res = requests.get(url) - for url in res.text.splitlines(): - if (len(url) > 4): - if url[-3:] == 'm3u': - return url[:-4] - if url[-3:] == 'pls': - res2 = requests.get(url) - # Loop through the data looking for the first url - for line in res2.text.splitlines(): - if line.startswith("File1="): - return line[6:] - else: - return url + def get_stream_urls(url): + _url = urlparse(url) + for scheme in ("http", "https"): + url_str = urlunparse( + _url._replace(scheme=scheme, query=_url.query + "&render=json") + ) + res = requests.get(url_str) + try: + res.raise_for_status() + break + except requests.exceptions.RequestException: + continue + else: + return "Failed to get stream url" + + stations = res.json().get("body", {}) + + for station in stations: + if station.get("url", "").endswith(".pls"): + res = requests.get(station["url"]) + file1 = [line for line in res.text.split("\n") if line.startswith("File1=")] + if file1: + station["url"] = file1[0].split("File1=")[1] + + return stations @staticmethod def featured(): - res = requests.post(TuneIn.featured_url) - return list(TuneIn._get_stations(res)) + res = requests.post( + TuneIn.featured_url, + data={**TuneIn.stnd_query, **{"c": "local"}} + ) + stations = res.json().get("body", [{}])[0].get("children", []) + return list(TuneIn._get_stations(stations)) @staticmethod def search(query): - res = requests.post(TuneIn.search_url, data={"query": query, "formats": "mp3,aac,ogg,html,hls"}) - return list(TuneIn._get_stations(res, query)) + res = requests.post( + TuneIn.search_url, + data={**TuneIn.stnd_query, **{"query": query}} + ) + stations = res.json().get("body", []) + return list(TuneIn._get_stations(stations, query)) @staticmethod - def _get_stations(res: requests.Response, query: str = ""): - res = xml2dict(res.text) - if not res.get("opml"): - return - # stations might be nested based on Playlist/Search - outline = res['opml']['body']["outline"] - - if not isinstance(outline, list): - return - if outline[0].get("outline"): - stations = outline[0]["outline"] - else: - stations = outline - + def _get_stations(stations: requests.Response, query: str = ""): for entry in stations: - try: - if not entry.get("key") == "unavailable" \ - and entry.get("type") == "audio" \ - and entry.get("item") == "station": - yield TuneInStation( - {"stream": TuneIn.get_stream_url(entry["URL"]), - "url": entry["URL"], - "title": entry.get("current_track") or entry.get("text"), - "artist": entry.get("text"), - "description": entry.get("subtext"), - "image": entry.get("image"), - "query": query - }) - except: + if ( + entry.get("key") == "unavailable" + or entry.get("type") != "audio" + or entry.get("item") != "station" + ): continue + streams = TuneIn.get_stream_urls(entry["URL"]) + for stream in streams: + yield TuneInStation( + { + "stream": stream["url"], + "bitrate": stream["bitrate"], + "media_type": stream["media_type"], + "url": entry["URL"], + "title": entry.get("current_track") or entry.get("text"), + "artist": entry.get("text"), + "description": entry.get("subtext"), + "image": entry.get("image"), + "query": query, + } + ) diff --git a/tunein/subcommands/search.py b/tunein/subcommands/search.py index f09c89b..b6ec99d 100644 --- a/tunein/subcommands/search.py +++ b/tunein/subcommands/search.py @@ -9,6 +9,7 @@ from tunein import TuneIn + class Ansi: """ANSI escape codes.""" @@ -26,17 +27,19 @@ class Ansi: YELLOW = "\x1B[33m" GREY = "\x1B[90m" + NOPRINT_TRANS_TABLE = { i: None for i in range(0, sys.maxunicode + 1) if not chr(i).isprintable() } + class Search: """The search subcommand for tunein.""" def __init__(self: Search, args: argparse.Namespace) -> None: """Initialize the search subcommand.""" self._args: argparse.Namespace = args - + def run(self: Search) -> None: """Run the search subcommand.""" tunein = TuneIn() @@ -45,7 +48,7 @@ def run(self: Search) -> None: if not stations: print(f"No results for {self._args.station}") sys.exit(1) - stations.sort(key=lambda x: x["match"], reverse=True) + stations.sort(key=lambda x: (x["match"], x["bit_rate"]), reverse=True) for station in stations: station["title"] = self._printable(station["title"]) station["artist"] = self._printable(station["artist"]) @@ -55,28 +58,58 @@ def run(self: Search) -> None: print(json.dumps(stations, indent=4)) elif self._args.format == "table": max_widths = {} - columns = ["title", "artist", "description"] + columns = ["title", "bit_rate", "media_type", "artist", "description"] for column in columns: - max_width = max(len(str(station[column])) for station in stations) + max_width = max( + [len(str(station[column])) for station in stations] + [len(column)] + ) if column == "description": term_width = shutil.get_terminal_size().columns - remaining = term_width - max_widths["title"] - max_widths["artist"] - 2 - max_width = min(max_width, remaining) + remaining = ( + term_width + - sum( + [ + max_widths[column] + for column in columns + if column != "description" + ] + ) + - len(columns) + - 1 + ) + max_width = min(max_width, remaining) max_widths[column] = max_width - print(" ".join(column.ljust(max_widths[column]).capitalize() for column in columns)) + print( + " ".join( + column.ljust(max_widths[column]).capitalize().replace("_", " ") + for column in columns + ) + ) print(" ".join("-" * max_widths[column] for column in columns)) for station in stations: line_parts = [] # title as link link = self._term_link(station.get("stream"), station["title"]) - line_parts.append(f"{link}{' '*(max_widths['title']-len(station['title']))}") + line_parts.append( + f"{link}{' '*(max_widths['title']-len(station['title']))}" + ) + # bit rate + line_parts.append( + str(station["bit_rate"]).ljust(max_widths["bit_rate"]) + ) + # media type + line_parts.append( + str(station["media_type"]).ljust(max_widths["media_type"]) + ) # artist line_parts.append(str(station["artist"]).ljust(max_widths["artist"])) # description clipped - line_parts.append(str(station["description"])[:max_widths["description"]]) + line_parts.append( + str(station["description"])[: max_widths["description"]] + ) print(" ".join(line_parts)) - + @staticmethod def _term_link(uri: str, label: str) -> str: """Return a link. @@ -97,12 +130,10 @@ def _term_link(uri: str, label: str) -> str: @staticmethod def _printable(string: str) -> str: """Replace non-printable characters in a string. - + Args: string: The string to replace non-printable characters in. Returns: The string with non-printable characters replaced. """ return string.translate(NOPRINT_TRANS_TABLE) - - diff --git a/tunein/xml_helper.py b/tunein/xml_helper.py deleted file mode 100644 index 75f7236..0000000 --- a/tunein/xml_helper.py +++ /dev/null @@ -1,67 +0,0 @@ -from collections import defaultdict -from xml.etree import cElementTree as ET - - -def etree2dict(t): - d = {t.tag: {} if t.attrib else None} - children = list(t) - if children: - dd = defaultdict(list) - for dc in map(etree2dict, children): - for k, v in dc.items(): - dd[k].append(v) - d = {t.tag: {k: v[0] if len(v) == 1 else v for k, v in dd.items()}} - if t.attrib: - d[t.tag].update((k, v) for k, v in t.attrib.items()) - if t.text: - text = t.text.strip() - if children or t.attrib: - if text: - d[t.tag]['text'] = text - else: - d[t.tag] = text - return d - - -def xml2dict(xml_string): - try: - xml_string = xml_string.replace('xmlns="http://www.w3.org/1999/xhtml"', - "") - e = ET.XML(xml_string) - d = etree2dict(e) - return d - except: - return {} - - -def load_xml2dict(xml_path): - tree = ET.parse(xml_path) - d = etree2dict(tree.getroot()) - return d["xml"] - - -def dict2xml(d, root="xml"): - xml = "<" + root - # props - for k in d: - if isinstance(d[k], str): - if k == "text": - pass - else: - xml += " " + k + ' ="' + d[k] + '"' - - xml += ">" - # insides - for k in d: - if isinstance(d[k], dict): - xml += dict2xml(d[k], k) - if isinstance(d[k], list): - for e in d[k]: - if isinstance(e, dict): - xml += dict2xml(e, k) - if isinstance(d[k], str): - if k == "text": - xml += d[k] - - xml += "" - return xml