From dd1fba14baa3fff7fffee885f056d6412a5f4580 Mon Sep 17 00:00:00 2001 From: Alec Mev Date: Wed, 10 Aug 2022 03:50:58 +0300 Subject: [PATCH] Refactor transport processors --- plugin/core/transports.py | 173 ++++++++++++++++++-------------------- plugin/core/types.py | 20 ++--- sublime-package.json | 11 +++ tests/test_protocol.py | 6 +- 4 files changed, 108 insertions(+), 102 deletions(-) diff --git a/plugin/core/transports.py b/plugin/core/transports.py index 43dadd6fa..24b94fe7d 100644 --- a/plugin/core/transports.py +++ b/plugin/core/transports.py @@ -5,7 +5,7 @@ from contextlib import closing from functools import partial from queue import Queue -import http +import http.client import json import multiprocessing.connection import os @@ -49,70 +49,99 @@ def on_stderr_message(self, message: str) -> None: class AbstractProcessor(Generic[T]): - def write_data(self, writer: IO[bytes], data: T, is_node_ipc: bool) -> None: + def write_data(self, data: T) -> None: raise NotImplementedError() - def read_data(self, reader: IO[bytes], is_node_ipc: bool) -> Optional[T]: + def read_data(self) -> Optional[T]: raise NotImplementedError() -class JsonRpcProcessor(AbstractProcessor[Dict[str, Any]]): +def encode_payload(data: Dict[str, Any]) -> bytes: + return json.dumps( + data, + ensure_ascii=False, + check_circular=False, + separators=(',', ':') + ).encode('utf-8') - def write_data(self, writer: IO[bytes], data: Dict[str, Any], is_node_ipc: bool) -> None: - body = self._encode(data) - if not is_node_ipc: - writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body)) - else: - writer.write(body + b"\n") - def read_data(self, reader: IO[bytes], is_node_ipc: bool) -> Optional[Dict[str, Any]]: - if not is_node_ipc: - headers = http.client.parse_headers(reader) # type: ignore - try: - body = reader.read(int(headers.get("Content-Length"))) - except TypeError: - # Expected error on process stopping. Stop the read loop. - raise StopLoopError() - else: - body = reader.readline() +def decode_payload(message: bytes) -> Optional[Dict[str, Any]]: + try: + return json.loads(message.decode('utf-8')) + except Exception as ex: + exception_log("JSON decode error", ex) + return None + + +class StandardProcessor(AbstractProcessor[Dict[str, Any]]): + def __init__(self, reader: Optional[IO[bytes]], writer: IO[bytes]): + if not reader or not writer: + raise RuntimeError('Failed initializing transport: reader: {}, writer: {}'.format(reader, writer)) + self._reader = reader + self._writer = writer + + def write_data(self, data: Dict[str, Any]) -> None: + body = encode_payload(data) + self._writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body)) + self._writer.flush() + + def read_data(self) -> Optional[Dict[str, Any]]: + headers = http.client.parse_headers(self._reader) # type: ignore try: - return self._decode(body) - except Exception as ex: - exception_log("JSON decode error", ex) - return None - - @staticmethod - def _encode(data: Dict[str, Any]) -> bytes: - return json.dumps( - data, - ensure_ascii=False, - check_circular=False, - separators=(',', ':') - ).encode('utf-8') - - @staticmethod - def _decode(message: bytes) -> Dict[str, Any]: - return json.loads(message.decode('utf-8')) + body = self._reader.read(int(headers.get("Content-Length"))) + except TypeError: + # Expected error on process stopping. Stop the read loop. + raise StopLoopError() + return decode_payload(body) + + +class NodeIpcProcessor(AbstractProcessor[Dict[str, Any]]): + _buf = bytearray() + _lines = 0 + + def __init__(self, conn: multiprocessing.connection._ConnectionBase): + self._conn = conn + + def write_data(self, data: Dict[str, Any]) -> None: + body = encode_payload(data) + b"\n" + while len(body): + n = self._conn._write(self._conn.fileno(), body) # type: ignore + body = body[n:] + + def read_data(self) -> Optional[Dict[str, Any]]: + while self._lines == 0: + chunk = self._conn._read(self._conn.fileno(), 65536) # type: ignore + if len(chunk) == 0: + # EOF reached: https://docs.python.org/3/library/os.html#os.read + raise StopLoopError() + + self._buf += chunk + self._lines += chunk.count(b'\n') + + self._lines -= 1 + message, _, self._buf = self._buf.partition(b'\n') + return decode_payload(message) class ProcessTransport(Transport[T]): - def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket.socket], reader: IO[bytes], - writer: IO[bytes], stderr: Optional[IO[bytes]], processor: AbstractProcessor[T], - callback_object: TransportCallbacks[T], is_node_ipc: bool) -> None: + def __init__(self, + name: str, + process: subprocess.Popen, + socket: Optional[socket.socket], + stderr: Optional[IO[bytes]], + processor: AbstractProcessor[T], + callback_object: TransportCallbacks[T]) -> None: self._closed = False self._process = process self._socket = socket - self._reader = reader - self._writer = writer self._stderr = stderr self._processor = processor self._reader_thread = threading.Thread(target=self._read_loop, name='{}-reader'.format(name)) self._writer_thread = threading.Thread(target=self._write_loop, name='{}-writer'.format(name)) self._stderr_thread = threading.Thread(target=self._stderr_loop, name='{}-stderr'.format(name)) self._callback_object = weakref.ref(callback_object) - self._is_node_ipc = is_node_ipc self._send_queue = Queue(0) # type: Queue[Union[T, None]] self._reader_thread.start() self._writer_thread.start() @@ -144,8 +173,8 @@ def __del__(self) -> None: def _read_loop(self) -> None: try: - while self._reader: - payload = self._processor.read_data(self._reader, self._is_node_ipc) + while True: + payload = self._processor.read_data() if payload is None: continue @@ -194,13 +223,11 @@ def invoke() -> None: def _write_loop(self) -> None: exception = None # type: Optional[Exception] try: - while self._writer: + while True: d = self._send_queue.get() if d is None: break - self._processor.write_data(self._writer, d, self._is_node_ipc) - if not self._is_node_ipc: - self._writer.flush() + self._processor.write_data(d) except (BrokenPipeError, AttributeError): pass except Exception as ex: @@ -228,35 +255,6 @@ def _stderr_loop(self) -> None: self._send_queue.put_nowait(None) -# Can be a singleton since it doesn't hold any state. -json_rpc_processor = JsonRpcProcessor() - - -class NodeIpcIO(): - _buf = bytearray() - _lines = 0 - - def __init__(self, conn: multiprocessing.connection._ConnectionBase): - self._conn = conn - - # https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L378-L392 - def readline(self) -> bytearray: - while self._lines == 0: - chunk = self._conn._read(self._conn.fileno(), 65536) # type: ignore - self._buf += chunk - self._lines += chunk.count(b'\n') - - self._lines -= 1 - line, _, self._buf = self._buf.partition(b'\n') - return line - - # https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L369-L376 - def write(self, data: bytes) -> None: - while len(data): - n = self._conn._write(self._conn.fileno(), data) # type: ignore - data = data[n:] - - def create_transport(config: TransportConfig, cwd: Optional[str], callback_object: TransportCallbacks) -> Transport[Dict[str, Any]]: stderr = subprocess.PIPE @@ -292,24 +290,22 @@ def start_subprocess() -> subprocess.Popen: config.listener_socket, start_subprocess ) + processor = StandardProcessor(reader, writer) # type: AbstractProcessor else: process = start_subprocess() if config.tcp_port: sock = _connect_tcp(config.tcp_port) if sock is None: raise RuntimeError("Failed to connect on port {}".format(config.tcp_port)) - reader = sock.makefile('rwb') # type: ignore - writer = reader + reader = writer = sock.makefile('rwb') + processor = StandardProcessor(reader, writer) elif not config.node_ipc: - reader = process.stdout # type: ignore - writer = process.stdin # type: ignore + processor = StandardProcessor(process.stdout, process.stdin) # type: ignore else: - reader = writer = NodeIpcIO(config.node_ipc.parent_conn) # type: ignore - if not reader or not writer: - raise RuntimeError('Failed initializing transport: reader: {}, writer: {}'.format(reader, writer)) + processor = NodeIpcProcessor(config.node_ipc.parent_conn) + stderr_reader = process.stdout if config.node_ipc else process.stderr - return ProcessTransport(config.name, process, sock, reader, writer, stderr_reader, json_rpc_processor, - callback_object, bool(config.node_ipc)) + return ProcessTransport(config.name, process, sock, stderr_reader, processor, callback_object) _subprocesses = weakref.WeakSet() # type: weakref.WeakSet[subprocess.Popen] @@ -403,8 +399,7 @@ def start_in_background(d: _SubprocessData) -> None: # Await one client connection (blocking!) sock, _ = listener_socket.accept() thread.join() - reader = sock.makefile('rwb') # type: IO[bytes] - writer = reader + reader = writer = sock.makefile('rwb') assert data.process return data.process, sock, reader, writer diff --git a/plugin/core/types.py b/plugin/core/types.py index e609243a6..9ebb810bd 100644 --- a/plugin/core/types.py +++ b/plugin/core/types.py @@ -607,7 +607,7 @@ def map_from_remote_to_local(self, uri: str) -> Tuple[str, bool]: return _translate_path(uri, self._remote, self._local) -NodeIpc = collections.namedtuple('NodeIpc', 'parent_conn,child_conn') +NodeIpcPipe = collections.namedtuple('NodeIpcPipe', 'parent_conn,child_conn') class TransportConfig: @@ -620,15 +620,10 @@ def __init__( tcp_port: Optional[int], env: Dict[str, str], listener_socket: Optional[socket.socket], - node_ipc: Optional[NodeIpc] + node_ipc: Optional[NodeIpcPipe] ) -> None: if not command and not tcp_port: raise ValueError('neither "command" nor "tcp_port" is provided; cannot start a language server') - if node_ipc and (tcp_port or listener_socket): - raise ValueError( - '"tcp_port" and "listener_socket" can\'t be provided in "--node-ipc" mode; ' + - 'cannot start a language server' - ) self.name = name self.command = command self.tcp_port = tcp_port @@ -644,6 +639,7 @@ def __init__(self, priority_selector: Optional[str] = None, schemes: Optional[List[str]] = None, command: Optional[List[str]] = None, + use_node_ipc: bool = False, binary_args: Optional[List[str]] = None, # DEPRECATED tcp_port: Optional[int] = None, auto_complete_selector: Optional[str] = None, @@ -668,6 +664,7 @@ def __init__(self, else: assert isinstance(binary_args, list) self.command = binary_args + self.use_node_ipc = use_node_ipc self.tcp_port = tcp_port self.auto_complete_selector = auto_complete_selector self.enabled = enabled @@ -701,9 +698,10 @@ def from_sublime_settings(cls, name: str, s: sublime.Settings, file: str) -> "Cl priority_selector=_read_priority_selector(s), schemes=s.get("schemes"), command=read_list_setting(s, "command", []), + use_node_ipc=bool(s.get("use_node_ipc", False)), tcp_port=s.get("tcp_port"), auto_complete_selector=s.get("auto_complete_selector"), - # Default to True, because an LSP plugin is enabled iff it is enabled as a Sublime package. + # Default to True, because an LSP plugin is enabled if it is enabled as a Sublime package. enabled=bool(s.get("enabled", True)), init_options=init_options, settings=settings, @@ -731,6 +729,7 @@ def from_dict(cls, name: str, d: Dict[str, Any]) -> "ClientConfig": priority_selector=_read_priority_selector(d), schemes=schemes, command=d.get("command", []), + use_node_ipc=d.get("use_node_ipc", False), tcp_port=d.get("tcp_port"), auto_complete_selector=d.get("auto_complete_selector"), enabled=d.get("enabled", False), @@ -758,6 +757,7 @@ def from_config(cls, src_config: "ClientConfig", override: Dict[str, Any]) -> "C priority_selector=_read_priority_selector(override) or src_config.priority_selector, schemes=override.get("schemes", src_config.schemes), command=override.get("command", src_config.command), + use_node_ipc=override.get("use_node_ipc", src_config.use_node_ipc), tcp_port=override.get("tcp_port", src_config.tcp_port), auto_complete_selector=override.get("auto_complete_selector", src_config.auto_complete_selector), enabled=override.get("enabled", src_config.enabled), @@ -803,8 +803,8 @@ def resolve_transport_config(self, variables: Dict[str, str]) -> TransportConfig else: env[key] = sublime.expand_variables(value, variables) node_ipc = None - if '--node-ipc' in command: - node_ipc = NodeIpc(*multiprocessing.Pipe()) + if self.use_node_ipc: + node_ipc = NodeIpcPipe(*multiprocessing.Pipe()) env["NODE_CHANNEL_FD"] = str(node_ipc.child_conn.fileno()) return TransportConfig(self.name, command, tcp_port, env, listener_socket, node_ipc) diff --git a/sublime-package.json b/sublime-package.json index 73927b458..ed3b75773 100644 --- a/sublime-package.json +++ b/sublime-package.json @@ -75,6 +75,11 @@ }, "markdownDescription": "The command to start the language server." }, + "ClientUseNodeIpc": { + "type": "boolean", + "default": false, + "markdownDescription": "Communicate with the language server over Node.js IPC. This lets the server print to stdout without disrupting the LSP communication. It's non-standard, but is used by VSCode. The command must be adjusted accordingly, e.g. `--stdio` must be replaced with `--node-ipc` in case of vscode-eslint. `tcp_port` is ignored if this is enabled." + }, "ClientEnabled": { "type": "boolean", "default": false, @@ -156,6 +161,9 @@ "command": { "$ref": "sublime://settings/LSP#/definitions/ClientCommand" }, + "use_node_ipc": { + "$ref": "sublime://settings/LSP#/definitions/ClientUseNodeIpc" + }, "enabled": { "$ref": "sublime://settings/LSP#/definitions/ClientEnabled" }, @@ -555,6 +563,9 @@ "command": { "$ref": "sublime://settings/LSP#/definitions/ClientCommand" }, + "use_node_ipc": { + "$ref": "sublime://settings/LSP#/definitions/ClientUseNodeIpc" + }, "enabled": { "$ref": "sublime://settings/LSP#/definitions/ClientEnabled" }, diff --git a/tests/test_protocol.py b/tests/test_protocol.py index 8e14f1d6e..198e7563e 100644 --- a/tests/test_protocol.py +++ b/tests/test_protocol.py @@ -1,5 +1,5 @@ from LSP.plugin.core.protocol import Point, Position, Range, RangeLsp, Request, Notification -from LSP.plugin.core.transports import JsonRpcProcessor +from LSP.plugin.core.transports import encode_payload, decode_payload import unittest @@ -129,9 +129,9 @@ def test_extend(self) -> None: class EncodingTests(unittest.TestCase): def test_encode(self) -> None: - encoded = JsonRpcProcessor._encode({"text": "😃"}) + encoded = encode_payload({"text": "😃"}) self.assertEqual(encoded, b'{"text":"\xF0\x9F\x98\x83"}') - decoded = JsonRpcProcessor._decode(encoded) + decoded = decode_payload(encoded) self.assertEqual(decoded, {"text": "😃"})