From 1748294b94593361e05cee2a84c173a10445c186 Mon Sep 17 00:00:00 2001 From: jimdale Date: Fri, 22 Mar 2024 10:30:05 +0100 Subject: [PATCH] add tqdm download meter, strip out pymonad classes from queryset operations --- pyproject.toml | 3 +- viewser/commands/queryset/models/queryset.py | 2 +- viewser/commands/queryset/operations.py | 207 ++++++++----------- viewser/remotes.py | 4 + 4 files changed, 93 insertions(+), 123 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 6d16dc6..cf4854f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "viewser" -version = "6.1.1" +version = "6.2.0" description = "The Views 3 CLI tool" authors = ["peder2911 "] readme = "README.md" @@ -27,6 +27,7 @@ psutil = "^5.8.0" strconv = "^0.4.2" pyarrow = ">9.0.0" views-storage = "^1.1.0" +tqdm = "^4.66.0" [tool.poetry.scripts] viewser = "viewser.cli:viewser" diff --git a/viewser/commands/queryset/models/queryset.py b/viewser/commands/queryset/models/queryset.py index 315c47c..efad717 100644 --- a/viewser/commands/queryset/models/queryset.py +++ b/viewser/commands/queryset/models/queryset.py @@ -185,5 +185,5 @@ def fetch(self, *args, **kwargs): Requires a self.push first. """ logger.info(f"Fetching queryset {self.name}") - dataset = queryset_operations.fetch(self.name, *args, **kwargs)#.maybe(None, lambda x:x) + dataset = queryset_operations.fetch(self.name) return dataset diff --git a/viewser/commands/queryset/operations.py b/viewser/commands/queryset/operations.py index 99a80b4..e4433be 100644 --- a/viewser/commands/queryset/operations.py +++ b/viewser/commands/queryset/operations.py @@ -6,40 +6,35 @@ import sys import time from typing import Optional -from io import BytesIO, BufferedWriter -from datetime import date +from urllib import parse +from tqdm import tqdm +import json import logging -from pyarrow.lib import ArrowInvalid import pandas as pd import io import requests -from toolz.functoolz import do, curry -from pymonad.either import Either, Left, Right -from pymonad.maybe import Just, Nothing, Maybe -from views_schema import viewser as viewser_schema from views_schema import queryset_manager as queryset_schema -from viewser import remotes -from viewser.error_handling import errors, error_handling -from viewser.tui import animations -from IPython.display import display, clear_output +from viewser.error_handling import error_handling + +from IPython.display import clear_output from . import queryset_list logger = logging.getLogger(__name__) -response_json = lambda rsp: rsp.json() class QuerysetOperations(): def __init__(self, - remote_url: str, - error_handler: Optional[error_handling.ErrorDumper] = None, - max_retries: int = sys.maxsize): + remote_url: str, + error_handler: Optional[error_handling.ErrorDumper] = None, + max_retries: int = sys.maxsize): + self._remote_url = remote_url self._max_retries = max_retries self._error_handler = error_handler if error_handler else error_handling.ErrorDumper([]) - def fetch(self, queryset_name:str, out_file: Optional[BufferedWriter] = None, start_date: Optional[date] = None, end_date: Optional[date] = None) -> Maybe[pd.DataFrame]: + def fetch(self, queryset_name: str) -> pd.DataFrame: """ fetch ===== @@ -57,13 +52,11 @@ def fetch(self, queryset_name:str, out_file: Optional[BufferedWriter] = None, st self._max_retries, self._remote_url, queryset_name, - start_date, end_date) -# if out_file is not None: -# f.then(curry(do, lambda data: data.to_parquet(out_file))) + ) + return f -# return f.either(self._error_handler.dump, Just) - def list(self) -> Maybe[queryset_list.QuerysetList]: + def list(self) -> queryset_list.QuerysetList: """ list ==== @@ -72,63 +65,42 @@ def list(self) -> Maybe[queryset_list.QuerysetList]: Optional[List[str]]: Returns a list of queryset name if operation succeeds. """ - return (self._request("GET", remotes.status_checks, "querysets") - .then(lambda r: r.json()) - .then(lambda d: queryset_list.QuerysetList(**d)) - .either(self._error_handler.dump, Just)) - def show(self, name: str) -> Maybe[queryset_schema.DetailQueryset]: - """ - show - ==== + response = requests.request(method="GET", url=f'{self._remote_url}/querysets') - parameters: - name (str): Name of the queryset to show + qs_list = queryset_list.QuerysetList() - returns: - Optional[viewser_schema.queryset_manager.DetailQueryset]: Returns queryset model if successful. + qs_list.querysets = response.content - """ - return (self._request("GET", remotes.status_checks, f"querysets/{name}") - .then(lambda r: r.json()) - .then(lambda d: queryset_schema.DetailQueryset(**d)) - .either(self._error_handler.dump, Just)) - - def publish(self, queryset: queryset_schema.Queryset, overwrite: bool = True) -> Maybe[requests.Response]: - (self._request( - "POST", - remotes.status_checks, - "querysets", - parameters = Just({"overwrite":overwrite}), - data = Just(queryset.dict())) - .either(self._error_handler.dump, Just)) - - def delete(self, name: str) -> Maybe[requests.Response]: - (self._request( "DELETE", - remotes.status_checks, - f"querysets/{name}", - ) - .either(self._error_handler.dump, Just)) - - def _request(self, method: str, checks, path, **kwargs) -> Either[viewser_schema.Dump, requests.Response]: - return remotes.request(self._remote_url, method, checks, path, **kwargs) - - def _deserialize(self, response: requests.Response) -> Either[viewser_schema.Dump, pd.DataFrame]: - if response.status_code == 202: - # No data yet - return Right(None) - else: - try: - return Right(pd.read_parquet(BytesIO(response.content))) - except (OSError, ArrowInvalid): - return Left(errors.deserialization_error(response)) - - def _fetch( - self, - max_retries : int, - base_url: str, name: str, - start_date: Optional[date] = None, end_date: Optional[date] = None - ) -> Either[viewser_schema.Dump, pd.DataFrame]: + return qs_list + + def publish(self, queryset: queryset_schema.Queryset, overwrite: bool = True) -> requests.Response: + + method = "POST" + + url = self._remote_url + "/querysets?" + parse.urlencode({"overwrite": overwrite}) + + request_kwargs = {"headers": {}} + + request_kwargs.update({"data": json.dumps(queryset.dict())}) + + request_kwargs["headers"].update({"Content-Type": "application/json"}) + + response = requests.request(method=method, url=url, **request_kwargs) + + return response + + def delete(self, name: str) -> requests.Response: + + method = "DELETE" + + url = self._remote_url + f"/querysets{name}" + + response = requests.request(method=method, url=url) + + return response + + def _fetch(self, max_retries: int, base_url: str, name: str) -> pd.DataFrame: """ _fetch ====== @@ -136,73 +108,65 @@ def _fetch( Args: base_url(str) name(str) - start_date(Optional[str]): Only fetch data after start_date - start_date(Optional[str]): Only fetch data before end_date Returns: - Either[errors.Dump, pd.DataFrame] + pd.DataFrame """ - start_date, end_date = [date.strftime("%Y-%m-%d") if date else None for date in (start_date, end_date)] - - checks = [ - remotes.check_4xx, - remotes.check_error, - remotes.check_404, - ] - - parameters = { - k:v for k,v in {"start_date":start_date, "end_date":end_date}.items() if v is not None - } - parameters = Just(parameters) if len(parameters) > 0 else Nothing + + def overprint(message_string, last_line_length, end): + space = ' ' + new_line_length = len(message_string) + pad = max(0, last_line_length - new_line_length) + print(f'{message_string}{(pad + 1) * space}', end=end) + + return new_line_length + path = f"data/{name}" + empty_df = pd.DataFrame() retries = 0 + delay = 5 failed = False succeeded = False + block_size = 1024 + last_line_length = 0 - space = ' ' + + url = base_url + '/' + path + '/' while not (succeeded or failed): - if retries > 0: - time.sleep(5) - data = remotes.request(base_url, "GET", checks, path, parameters=parameters) + data = io.BytesIO() - try: - data = pd.read_parquet(io.BytesIO(data.value.content)) + response = requests.get(url, stream=True) + total_size = int(response.headers.get("content-length", 0)) - message_string = f'{retries + 1}: Queryset {name} read successfully' - new_line_length = len(message_string) - pad = last_line_length - new_line_length - last_line_length = new_line_length + if total_size > 1e6: + with tqdm(total=total_size, unit="B", unit_scale=True) as progress_bar: + for segment in response.iter_content(block_size): + progress_bar.update(len(segment)) + data.write(segment) - if pad > 0: - print(f'{retries+1}: Queryset {name} read successfully {(pad + 1)*space}') - else: - print(f'{retries + 1}: Queryset {name} read successfully') + else: + for segment in response.iter_content(block_size): + data.write(segment) - succeeded = True - except: - message = data.value.content.decode() - if retries == 0: + try: + data = pd.read_parquet(data) - message_string = f'{retries + 1}: {message}' - last_line_length = len(message_string) - else: + message_string = f'Queryset {name} read successfully' + new_line_length = overprint(message_string, last_line_length, end="\n") - message_string = f'{retries + 1}: {message}' - new_line_length = len(message_string) - pad = last_line_length - new_line_length - last_line_length = new_line_length + succeeded = True - if pad > 0: - print(f'{retries + 1}: {message} {(pad + 1)*space}', end="\r") - else: - print(f'{retries + 1}: {message}', end="\r") + except: + message = data.getvalue().decode() + message_string = f'{retries + 1}: {message}' + last_line_length = overprint(message_string, last_line_length, end="\r") if 'failed' in message: failed = True - data = message + data = empty_df if retries > max_retries: @@ -210,8 +174,9 @@ def _fetch( print(f'Max attempts ({max_retries}) to retrieve {name} exceeded: aborting retrieval', end="\r") failed = True - data = message + data = empty_df retries += 1 + time.sleep(delay) - return data \ No newline at end of file + return data diff --git a/viewser/remotes.py b/viewser/remotes.py index e3d9774..b4fd382 100644 --- a/viewser/remotes.py +++ b/viewser/remotes.py @@ -46,6 +46,9 @@ def update_kwargs(kwargs, data): .maybe(request_kwargs, curry(update_kwargs, request_kwargs)) ) + print(request_args) + print(request_kwargs) + try: response = requests.request(*request_args,**request_kwargs) except requests.exceptions.ConnectionError: @@ -189,6 +192,7 @@ def request( data.then(str).then(lambda r: f"POSTing {r}").then(logger.debug) url = make_url(base_url, path, parameters) + print('url:',url) response = url.then(curry(make_request, method = method, json_data = data)) return compose_checks(checks)(response)