Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inspect progress bars with a global handler #695

Merged
merged 15 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/electron/frontend/core/components/utils/progress.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export const createProgressPopup = async (options, tqdmCallback) => {
return { ...commonReturnValue, id, close };
};

const eventsURL = new URL("/neuroconv/events", baseUrl).href;
const progressEventsUrl = new URL("/neuroconv/events/progress", baseUrl).href;

class ProgressHandler {
constructor(props) {
Expand All @@ -112,4 +112,4 @@ class ProgressHandler {
removeEventListener = (...args) => this.source.removeEventListener(...args);
}

export const progressHandler = new ProgressHandler({ url: eventsURL });
export const progressHandler = new ProgressHandler({ url: progressEventsUrl });
3 changes: 2 additions & 1 deletion src/pyflask/manageNeuroconv/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
inspect_multiple_filesystem_objects,
inspect_nwb_file,
inspect_nwb_folder,
listen_to_neuroconv_events,
listen_to_neuroconv_progress_events,
locate_data,
progress_handler,
upload_folder_to_dandi,
upload_multiple_filesystem_objects_to_dandi,
upload_project_to_dandi,
Expand Down
2 changes: 1 addition & 1 deletion src/pyflask/manageNeuroconv/info/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .sse import announcer, format_sse
from .sse import format_sse
from .urls import (
CONVERSION_SAVE_FOLDER_PATH,
GUIDE_ROOT_FOLDER,
Expand Down
22 changes: 0 additions & 22 deletions src/pyflask/manageNeuroconv/info/sse.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,8 @@
import json
import queue


def format_sse(data: str, event=None) -> str:
msg = f"data: {json.dumps(data)}\n\n"
if event is not None:
msg = f"event: {event}\n{msg}"
return msg


class MessageAnnouncer:
def __init__(self):
self.listeners = []

def listen(self):
q = queue.Queue(maxsize=5)
self.listeners.append(q)
return q

def announce(self, msg, event=None):
msg = format_sse(msg, event)
for i in reversed(range(len(self.listeners))):
try:
self.listeners[i].put_nowait(msg)
except queue.Full:
del self.listeners[i]


announcer = MessageAnnouncer()
46 changes: 15 additions & 31 deletions src/pyflask/manageNeuroconv/manage_neuroconv.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
from shutil import copytree, rmtree
from typing import Any, Dict, List, Optional, Union

from .info import (
CONVERSION_SAVE_FOLDER_PATH,
GUIDE_ROOT_FOLDER,
STUB_SAVE_FOLDER_PATH,
announcer,
)
from tqdm_publisher import TQDMProgressHandler

from .info import CONVERSION_SAVE_FOLDER_PATH, GUIDE_ROOT_FOLDER, STUB_SAVE_FOLDER_PATH
from .info.sse import format_sse

progress_handler = TQDMProgressHandler()

EXCLUDED_RECORDING_INTERFACE_PROPERTIES = ["contact_vector", "contact_shapes", "group", "location"]

Expand Down Expand Up @@ -727,15 +727,12 @@ def convert_to_nwb(info: dict) -> str:

converter = instantiate_custom_converter(resolved_source_data, info["interfaces"])

def update_conversion_progress(**kwargs):
announcer.announce(dict(**kwargs, nwbfile_path=nwbfile_path), "conversion_progress")

# Assume all interfaces have the same conversion options for now
available_options = converter.get_conversion_options_schema()
options = (
{
interface: (
{"stub_test": info["stub_test"]} # , "iter_opts": {"report_hook": update_conversion_progress}}
{"stub_test": info["stub_test"]}
if available_options.get("properties").get(interface).get("properties", {}).get("stub_test")
else {}
)
Expand Down Expand Up @@ -914,11 +911,11 @@ def upload_project_to_dandi(


# Create an events endpoint
def listen_to_neuroconv_events():
messages = announcer.listen() # returns a queue.Queue
def listen_to_neuroconv_progress_events():
messages = progress_handler.listen() # returns a queue.Queue
while True:
msg = messages.get() # blocks until a new message arrives
yield msg
yield format_sse(msg)


def generate_dataset(input_path: str, output_path: str) -> dict:
Expand Down Expand Up @@ -1041,9 +1038,7 @@ def inspect_all(url, config):

nwbfile_paths = list(Path(path).rglob("*.nwb"))

request_id = config.get("request_id")
if request_id:
config.pop("request_id")
request_id = config.pop("request_id", None)

n_jobs = config.get("n_jobs", -2) # Default to all but one CPU
n_jobs = calculate_number_of_cpu(requested_cpu=n_jobs)
Expand All @@ -1068,7 +1063,7 @@ def inspect_all(url, config):
# Announce directly
def on_progress_update(message):
message["progress_bar_id"] = request_id # Ensure request_id matches
announcer.announce(
progress_handler.announce(
dict(
request_id=request_id,
**message,
Expand All @@ -1086,7 +1081,6 @@ def on_progress_update(message):
i = 0
for future in inspection_iterable:
i += 1
# on_progress_update(dict(progress_bar_id=request_id, format_dict=dict(total=len(futures), n=i)))
for message in future.result():
messages.append(message)

Expand All @@ -1096,25 +1090,15 @@ def on_progress_update(message):
def inspect_nwb_folder(url, payload) -> dict:
from pickle import PicklingError

from nwbinspector import load_config
from nwbinspector.inspector_tools import format_messages, get_report_header
from nwbinspector.nwbinspector import InspectorOutputJSONEncoder

kwargs = dict(
ignore=[
"check_description",
"check_data_orientation",
], # TODO: remove when metadata control is exposed
config=load_config(filepath_or_keyword="dandi"),
**payload,
)

try:
messages = inspect_all(url, kwargs)
messages = inspect_all(url, payload)
except PicklingError as exception:
if "attribute lookup auto_parse_some_output on nwbinspector.register_checks failed" in str(exception):
del kwargs["n_jobs"]
messages = inspect_all(url, kwargs)
del payload["n_jobs"]
messages = inspect_all(url, payload)
else:
raise exception
except Exception as exception:
Expand Down
20 changes: 9 additions & 11 deletions src/pyflask/namespaces/neuroconv.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
inspect_multiple_filesystem_objects,
inspect_nwb_file,
inspect_nwb_folder,
listen_to_neuroconv_events,
listen_to_neuroconv_progress_events,
locate_data,
progress_handler,
upload_folder_to_dandi,
upload_multiple_filesystem_objects_to_dandi,
upload_project_to_dandi,
validate_metadata,
)
from manageNeuroconv.info import announcer

neuroconv_namespace = Namespace("neuroconv", description="Neuroconv neuroconv_namespace for the NWB GUIDE.")

Expand Down Expand Up @@ -158,17 +158,16 @@ def post(self):
class InspectNWBFolder(Resource):
@neuroconv_namespace.doc(responses={200: "Success", 400: "Bad Request", 500: "Internal server error"})
def post(self):
url = f"{request.url_root}neuroconv/announce"
url = f"{request.url_root}neuroconv/announce/progress"
return inspect_nwb_folder(url, neuroconv_namespace.payload)


@neuroconv_namespace.route("/announce")
@neuroconv_namespace.route("/announce/progress")
class InspectNWBFolder(Resource):
@neuroconv_namespace.doc(responses={200: "Success", 400: "Bad Request", 500: "Internal server error"})
def post(self):
data = neuroconv_namespace.payload
announcer.announce(data)

progress_handler.announce(data)
return True


Expand All @@ -178,7 +177,7 @@ class InspectNWBFolder(Resource):
def post(self):
from os.path import isfile

url = f"{request.url_root}neuroconv/announce"
url = f"{request.url_root}neuroconv/announce/progress"

paths = neuroconv_namespace.payload["paths"]

Expand Down Expand Up @@ -207,9 +206,8 @@ def post(self):


# Create an events endpoint
# announcer.announce('test', 'publish')
@neuroconv_namespace.route("/events", methods=["GET"])
class Events(Resource):
@neuroconv_namespace.route("/events/progress", methods=["GET"])
class ProgressEvents(Resource):
@neuroconv_namespace.doc(responses={200: "Success", 400: "Bad Request", 500: "Internal server error"})
def get(self):
return Response(listen_to_neuroconv_events(), mimetype="text/event-stream")
return Response(listen_to_neuroconv_progress_events(), mimetype="text/event-stream")
Loading