diff --git a/robyn/__init__.py b/robyn/__init__.py index 27c8d89b..0808dc08 100644 --- a/robyn/__init__.py +++ b/robyn/__init__.py @@ -5,8 +5,8 @@ from pathlib import Path from typing import Callable, List, Optional, Tuple, Union -import multiprocess as mp -from nestd import get_all_nested +import multiprocess as mp # type: ignore +from nestd import get_all_nested # type: ignore from robyn import status_codes from robyn.argument_parser import Config @@ -42,7 +42,7 @@ def __init__( self, file_object: str, config: Config = Config(), - openapi_file_path: str = None, + openapi_file_path: Optional[str] = None, openapi: OpenAPI = OpenAPI(), dependencies: DependencyMap = DependencyMap(), ) -> None: @@ -79,7 +79,7 @@ def __init__( self.response_headers: Headers = Headers({}) self.excluded_response_headers_paths: Optional[List[str]] = None self.directories: List[Directory] = [] - self.event_handlers = {} + self.event_handlers: dict = {} self.exception_handler: Optional[Callable] = None self.authentication_handler: Optional[AuthenticationHandler] = None diff --git a/robyn/argument_parser.py b/robyn/argument_parser.py index 88c93ae7..2232c55b 100644 --- a/robyn/argument_parser.py +++ b/robyn/argument_parser.py @@ -103,7 +103,8 @@ def __init__(self) -> None: if self.fast: # doing this here before every other check # so that processes, workers and log_level can be overridden - self.processes = self.processes or ((os.cpu_count() * 2) + 1) or 1 + cpu_count: int = os.cpu_count() or 1 + self.processes = self.processes or ((cpu_count * 2) + 1) or 1 self.workers = self.workers or 2 self.log_level = self.log_level or "WARNING" diff --git a/robyn/authentication.py b/robyn/authentication.py index 5dd76af5..dc01edf7 100644 --- a/robyn/authentication.py +++ b/robyn/authentication.py @@ -81,7 +81,7 @@ class BearerGetter(TokenGetter): @classmethod def get_token(cls, request: Request) -> Optional[str]: - if "authorization" in request.headers: + if request.headers.contains("authorization"): authorization_header = request.headers.get("authorization") else: authorization_header = None diff --git a/robyn/dependency_injection.py b/robyn/dependency_injection.py index 4cb18e4b..c1442db6 100644 --- a/robyn/dependency_injection.py +++ b/robyn/dependency_injection.py @@ -4,7 +4,7 @@ class DependencyMap: - def __init__(self): + def __init__(self) -> None: self.global_dependency_map: dict[str, Any] = {} # {'router': {'dependency_name': dependency_class} self.router_dependency_map: dict[str, dict[str, Any]] = {} diff --git a/robyn/openapi.py b/robyn/openapi.py index 5d4868c3..be5aeecb 100644 --- a/robyn/openapi.py +++ b/robyn/openapi.py @@ -5,9 +5,9 @@ from importlib import resources from inspect import Signature from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, TypedDict +from typing import Any, Callable, Dict, List, Optional, Tuple, TypedDict -from robyn.responses import FileResponse, html +from robyn.responses import html from robyn.robyn import QueryParams, Response from robyn.types import Body @@ -239,13 +239,13 @@ def get_path_obj( query_params: Optional[TypedDict], request_body: Optional[TypedDict], return_annotation: Optional[TypedDict], - ) -> (str, dict): + ) -> Tuple[str, dict]: """ Get the "path" openapi object according to spec @param endpoint: str the endpoint to be added @param name: str the name of the endpoint - @param description: Optional[str] short description of the endpoint (to be fetched from the endpoint defenition by default) + @param description: Optional[str] short description of the endpoint (to be fetched from the endpoint definition by default) @param tags: List[str] for grouping of endpoints @param query_params: Optional[TypedDict] query params for the function @param request_body: Optional[TypedDict] request body for the function @@ -328,7 +328,7 @@ def get_path_obj( openapi_path_object["requestBody"] = request_body_object - response_schema = {} + response_schema: dict = {} response_type = "text/plain" if return_annotation and return_annotation is not Response: @@ -416,7 +416,7 @@ def override_openapi(self, openapi_json_spec_path: Path): self.openapi_spec = dict(json_file_content) self.openapi_file_override = True - def get_openapi_docs_page(self) -> FileResponse: + def get_openapi_docs_page(self) -> Response: """ Handler to the swagger html page to be deployed to the endpoint `/docs` @return: FileResponse the swagger html page diff --git a/robyn/processpool.py b/robyn/processpool.py index 1391d21b..54673889 100644 --- a/robyn/processpool.py +++ b/robyn/processpool.py @@ -4,7 +4,7 @@ import webbrowser from typing import Dict, List, Optional -from multiprocess import Process +from multiprocess import Process # type: ignore from robyn.events import Events from robyn.logger import logger @@ -80,7 +80,7 @@ def init_processpool( response_headers: Headers, excluded_response_headers_paths: Optional[List[str]], ) -> List[Process]: - process_pool = [] + process_pool: List = [] if sys.platform.startswith("win32") or processes == 1: spawn_process( directories, diff --git a/robyn/reloader.py b/robyn/reloader.py index bef6197c..ddb52636 100644 --- a/robyn/reloader.py +++ b/robyn/reloader.py @@ -118,7 +118,7 @@ def __init__(self, file_path: str, directory_path: str) -> None: self.file_path = file_path self.directory_path = directory_path self.process = None # Keep track of the subprocess - self.built_rust_binaries = [] # Keep track of the built rust binaries + self.built_rust_binaries: List = [] # Keep track of the built rust binaries self.last_reload = time.time() # Keep track of the last reload. EventHandler is initialized with the process.