Skip to content

Commit

Permalink
Merge pull request #57 from prio-data/downlaod_meter
Browse files Browse the repository at this point in the history
add tqdm download meter, strip out pymonad classes from queryset oper…
  • Loading branch information
jimdale authored Mar 22, 2024
2 parents 89ea2d8 + 1748294 commit a356cb4
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 123 deletions.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "viewser"
version = "6.1.1"
version = "6.2.0"
description = "The Views 3 CLI tool"
authors = ["peder2911 <[email protected]>"]
readme = "README.md"
Expand All @@ -27,6 +27,7 @@ psutil = "^5.8.0"
strconv = "^0.4.2"
pyarrow = ">9.0.0"
views-storage = "^1.1.0"
tqdm = "^4.66.0"

[tool.poetry.scripts]
viewser = "viewser.cli:viewser"
Expand Down
2 changes: 1 addition & 1 deletion viewser/commands/queryset/models/queryset.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,5 +185,5 @@ def fetch(self, *args, **kwargs):
Requires a self.push first.
"""
logger.info(f"Fetching queryset {self.name}")
dataset = queryset_operations.fetch(self.name, *args, **kwargs)#.maybe(None, lambda x:x)
dataset = queryset_operations.fetch(self.name)
return dataset
207 changes: 86 additions & 121 deletions viewser/commands/queryset/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,35 @@
import sys
import time
from typing import Optional
from io import BytesIO, BufferedWriter
from datetime import date
from urllib import parse
from tqdm import tqdm
import json
import logging
from pyarrow.lib import ArrowInvalid
import pandas as pd
import io
import requests
from toolz.functoolz import do, curry
from pymonad.either import Either, Left, Right
from pymonad.maybe import Just, Nothing, Maybe
from views_schema import viewser as viewser_schema
from views_schema import queryset_manager as queryset_schema
from viewser import remotes
from viewser.error_handling import errors, error_handling
from viewser.tui import animations
from IPython.display import display, clear_output
from viewser.error_handling import error_handling

from IPython.display import clear_output

from . import queryset_list

logger = logging.getLogger(__name__)

response_json = lambda rsp: rsp.json()

class QuerysetOperations():

def __init__(self,
remote_url: str,
error_handler: Optional[error_handling.ErrorDumper] = None,
max_retries: int = sys.maxsize):
remote_url: str,
error_handler: Optional[error_handling.ErrorDumper] = None,
max_retries: int = sys.maxsize):

self._remote_url = remote_url
self._max_retries = max_retries
self._error_handler = error_handler if error_handler else error_handling.ErrorDumper([])

def fetch(self, queryset_name:str, out_file: Optional[BufferedWriter] = None, start_date: Optional[date] = None, end_date: Optional[date] = None) -> Maybe[pd.DataFrame]:
def fetch(self, queryset_name: str) -> pd.DataFrame:
"""
fetch
=====
Expand All @@ -57,13 +52,11 @@ def fetch(self, queryset_name:str, out_file: Optional[BufferedWriter] = None, st
self._max_retries,
self._remote_url,
queryset_name,
start_date, end_date)
# if out_file is not None:
# f.then(curry(do, lambda data: data.to_parquet(out_file)))
)

return f
# return f.either(self._error_handler.dump, Just)

def list(self) -> Maybe[queryset_list.QuerysetList]:
def list(self) -> queryset_list.QuerysetList:
"""
list
====
Expand All @@ -72,146 +65,118 @@ def list(self) -> Maybe[queryset_list.QuerysetList]:
Optional[List[str]]: Returns a list of queryset name if operation succeeds.
"""
return (self._request("GET", remotes.status_checks, "querysets")
.then(lambda r: r.json())
.then(lambda d: queryset_list.QuerysetList(**d))
.either(self._error_handler.dump, Just))

def show(self, name: str) -> Maybe[queryset_schema.DetailQueryset]:
"""
show
====
response = requests.request(method="GET", url=f'{self._remote_url}/querysets')

parameters:
name (str): Name of the queryset to show
qs_list = queryset_list.QuerysetList()

returns:
Optional[viewser_schema.queryset_manager.DetailQueryset]: Returns queryset model if successful.
qs_list.querysets = response.content

"""
return (self._request("GET", remotes.status_checks, f"querysets/{name}")
.then(lambda r: r.json())
.then(lambda d: queryset_schema.DetailQueryset(**d))
.either(self._error_handler.dump, Just))

def publish(self, queryset: queryset_schema.Queryset, overwrite: bool = True) -> Maybe[requests.Response]:
(self._request(
"POST",
remotes.status_checks,
"querysets",
parameters = Just({"overwrite":overwrite}),
data = Just(queryset.dict()))
.either(self._error_handler.dump, Just))

def delete(self, name: str) -> Maybe[requests.Response]:
(self._request( "DELETE",
remotes.status_checks,
f"querysets/{name}",
)
.either(self._error_handler.dump, Just))

def _request(self, method: str, checks, path, **kwargs) -> Either[viewser_schema.Dump, requests.Response]:
return remotes.request(self._remote_url, method, checks, path, **kwargs)

def _deserialize(self, response: requests.Response) -> Either[viewser_schema.Dump, pd.DataFrame]:
if response.status_code == 202:
# No data yet
return Right(None)
else:
try:
return Right(pd.read_parquet(BytesIO(response.content)))
except (OSError, ArrowInvalid):
return Left(errors.deserialization_error(response))

def _fetch(
self,
max_retries : int,
base_url: str, name: str,
start_date: Optional[date] = None, end_date: Optional[date] = None
) -> Either[viewser_schema.Dump, pd.DataFrame]:
return qs_list

def publish(self, queryset: queryset_schema.Queryset, overwrite: bool = True) -> requests.Response:

method = "POST"

url = self._remote_url + "/querysets?" + parse.urlencode({"overwrite": overwrite})

request_kwargs = {"headers": {}}

request_kwargs.update({"data": json.dumps(queryset.dict())})

request_kwargs["headers"].update({"Content-Type": "application/json"})

response = requests.request(method=method, url=url, **request_kwargs)

return response

def delete(self, name: str) -> requests.Response:

method = "DELETE"

url = self._remote_url + f"/querysets{name}"

response = requests.request(method=method, url=url)

return response

def _fetch(self, max_retries: int, base_url: str, name: str) -> pd.DataFrame:
"""
_fetch
======
Fetches queryset located at {base_url}/querysets/data/{name}
Args:
base_url(str)
name(str)
start_date(Optional[str]): Only fetch data after start_date
start_date(Optional[str]): Only fetch data before end_date
Returns:
Either[errors.Dump, pd.DataFrame]
pd.DataFrame
"""
start_date, end_date = [date.strftime("%Y-%m-%d") if date else None for date in (start_date, end_date)]

checks = [
remotes.check_4xx,
remotes.check_error,
remotes.check_404,
]

parameters = {
k:v for k,v in {"start_date":start_date, "end_date":end_date}.items() if v is not None
}
parameters = Just(parameters) if len(parameters) > 0 else Nothing

def overprint(message_string, last_line_length, end):
space = ' '
new_line_length = len(message_string)
pad = max(0, last_line_length - new_line_length)
print(f'{message_string}{(pad + 1) * space}', end=end)

return new_line_length

path = f"data/{name}"

empty_df = pd.DataFrame()
retries = 0
delay = 5

failed = False
succeeded = False
block_size = 1024

last_line_length = 0
space = ' '

url = base_url + '/' + path + '/'

while not (succeeded or failed):
if retries > 0:
time.sleep(5)

data = remotes.request(base_url, "GET", checks, path, parameters=parameters)
data = io.BytesIO()

try:
data = pd.read_parquet(io.BytesIO(data.value.content))
response = requests.get(url, stream=True)
total_size = int(response.headers.get("content-length", 0))

message_string = f'{retries + 1}: Queryset {name} read successfully'
new_line_length = len(message_string)
pad = last_line_length - new_line_length
last_line_length = new_line_length
if total_size > 1e6:
with tqdm(total=total_size, unit="B", unit_scale=True) as progress_bar:
for segment in response.iter_content(block_size):
progress_bar.update(len(segment))
data.write(segment)

if pad > 0:
print(f'{retries+1}: Queryset {name} read successfully {(pad + 1)*space}')
else:
print(f'{retries + 1}: Queryset {name} read successfully')
else:
for segment in response.iter_content(block_size):
data.write(segment)

succeeded = True
except:
message = data.value.content.decode()
if retries == 0:
try:
data = pd.read_parquet(data)

message_string = f'{retries + 1}: {message}'
last_line_length = len(message_string)
else:
message_string = f'Queryset {name} read successfully'
new_line_length = overprint(message_string, last_line_length, end="\n")

message_string = f'{retries + 1}: {message}'
new_line_length = len(message_string)
pad = last_line_length - new_line_length
last_line_length = new_line_length
succeeded = True

if pad > 0:
print(f'{retries + 1}: {message} {(pad + 1)*space}', end="\r")
else:
print(f'{retries + 1}: {message}', end="\r")
except:
message = data.getvalue().decode()
message_string = f'{retries + 1}: {message}'
last_line_length = overprint(message_string, last_line_length, end="\r")

if 'failed' in message:
failed = True
data = message
data = empty_df

if retries > max_retries:

clear_output(wait=True)
print(f'Max attempts ({max_retries}) to retrieve {name} exceeded: aborting retrieval', end="\r")

failed = True
data = message
data = empty_df

retries += 1
time.sleep(delay)

return data
return data
4 changes: 4 additions & 0 deletions viewser/remotes.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ def update_kwargs(kwargs, data):
.maybe(request_kwargs, curry(update_kwargs, request_kwargs))
)

print(request_args)
print(request_kwargs)

try:
response = requests.request(*request_args,**request_kwargs)
except requests.exceptions.ConnectionError:
Expand Down Expand Up @@ -189,6 +192,7 @@ def request(
data.then(str).then(lambda r: f"POSTing {r}").then(logger.debug)

url = make_url(base_url, path, parameters)
print('url:',url)
response = url.then(curry(make_request, method = method, json_data = data))
return compose_checks(checks)(response)

Expand Down

0 comments on commit a356cb4

Please sign in to comment.