From a024806201d026e17a18370b27dfdda4bd0c1a0e Mon Sep 17 00:00:00 2001 From: rev1si0n Date: Sun, 24 Nov 2024 15:53:15 +0800 Subject: [PATCH] 7.75 auto-commit --- CHANGELOG.txt | 5 ++ lamda/__init__.py | 2 +- lamda/client.py | 190 ++++++++++++++++++++++++++++++++++++++- lamda/rpc/services.proto | 1 + setup.py | 6 +- 5 files changed, 199 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.txt b/CHANGELOG.txt index 97c67f6..bc241c6 100644 --- a/CHANGELOG.txt +++ b/CHANGELOG.txt @@ -1,3 +1,8 @@ +7.75 +* 新增 OCR识别接口 +* 新增 get_application_by_name +* 更新部分子模块及依赖版本 + 7.73 * 修复部分应用白屏的问题 diff --git a/lamda/__init__.py b/lamda/__init__.py index edb2f00..05233db 100644 --- a/lamda/__init__.py +++ b/lamda/__init__.py @@ -2,4 +2,4 @@ # # Distributed under MIT license. # See file LICENSE for detail or copy at https://opensource.org/licenses/MIT -__version__ = "7.73" +__version__ = "7.75" diff --git a/lamda/client.py b/lamda/client.py index 80d41f0..c503e20 100644 --- a/lamda/client.py +++ b/lamda/client.py @@ -44,7 +44,9 @@ from . import __version__ from . types import AttributeDict, BytesIO -from . exceptions import UnHandledException, DuplicateEntryError, InvalidArgumentError +from . exceptions import (UnHandledException, DuplicateEntryError, + InvalidArgumentError, UiObjectNotFoundException, + IllegalStateException) from . import exceptions handler = logging.StreamHandler() @@ -63,6 +65,8 @@ "GproxyType", "GrantType", "Group", + "CustomOcrBackend", + "OcrEngine", "Key", "Keys", "KeyCode", @@ -168,6 +172,18 @@ def center(b): y = int(b.top + (b.bottom - b.top)/2) return Point(x=x, y=y) +def contain(a, b): + return all([b.top >= a.top, + b.left >= a.left, + b.bottom <= a.bottom, + b.right <= a.right]) + +def equal(a, b): + return all([b.top == a.top, + b.left == a.left, + b.bottom == a.bottom, + b.right == a.right]) + def corner(b, position): ca, cb = position.split("-") return Point(x=getattr(b, cb), @@ -248,6 +264,8 @@ def corner(b, position): Bound.center = center Bound.corner = corner +Bound.__contains__ = contain +Bound.__eq__ = equal def load_proto(name): @@ -267,6 +285,13 @@ def Selector(**kwargs): return sel +class CustomOcrBackend(object): + def __init__(self, *args, **kwargs): + raise NotImplementedError + def ocr(self, image): + raise NotImplementedError + + class BaseCryptor(object): def __str__(self): return "{}".format(self.__class__.__name__) @@ -1100,6 +1125,11 @@ def current_application(self): app = self.__call__(top.packageName) app.activity = top.activity return app + def get_application_by_name(self, name, user=0): + req = protos.String(value=name) + r = self.stub.getIdentifierByLabel(req) + app = self.__call__(r.value, user=user) + return app def enumerate_running_processes(self): """ 列出设备上所有正在运行的安卓应用进程 @@ -1804,6 +1834,140 @@ def save_config(self): raise NotImplementedError +class OcrOperator(object): + def __init__(self, device, elements=None, + **kwargs): + self.elements = elements + self.index = kwargs.pop("index", 0) + self.func, self.rule = kwargs.popitem() + self.match = getattr(self, self.func) + self.device = device + def text(self, item): + return self.rule == item["text"] + def textMatches(self, item): + return bool(re.match(self.rule, item["text"], + re.DOTALL)) + def textContains(self, item): + return self.rule in item["text"] + def find_target_item(self): + m = [e for e in self.elements \ + if self.match(e)] + o = (m and len(m) > self.index) != True + return None if o else m[self.index] + def find_item_or_throw(self): + item = self.find_target_item() + msg = "OcrSelector[{}={}]".format(self.func, self.rule) + item or self.throw(UiObjectNotFoundException, msg) + return item + def find_cb(self, func, ret, *args): + item = self.find_target_item() + return func(item, *args) if item else ret + def find_or_throw_cb(self, func, *args): + item = self.find_item_or_throw() + return func(item, *args) + def throw(self, exception, *args): + raise exception(*args) + def _screenshot(self, item, quality): + return self.device.screenshot(quality, + bound=item["bound"]) + def _click(self, item): + point = item["bound"].center() + return self.device.click(point) + def __str__(self): + return "Ocr: {}={}".format(self.func, self.rule) + __repr__ = __str__ + def exists(self): + """ + OCR - 检查元素是否存在 + """ + return bool(self.find_target_item()) + def exist(self): + """ + OCR - 检查元素是否存在 + """ + return self.exists() + def click(self): + """ + OCR - 点击元素(不存在则报错) + """ + return self.find_or_throw_cb(self._click) + def click_exists(self): + """ + OCR - 点击元素(不存在将不会产生异常) + """ + return self.find_cb(self._click, False) + def click_exist(self): + """ + OCR - 点击元素(不存在将不会产生异常) + """ + return self.click_exists() + def screenshot(self, quality=100): + """ + OCR - 对元素进行截图 + """ + return self.find_or_throw_cb(self._screenshot, + quality) + def take_screenshot(self, quality=100): + """ + OCR - 对元素进行截图 + """ + return self.screenshot(quality) + def info(self): + """ + OCR - 获取匹配元素的信息 + """ + item = self.find_item_or_throw() + return item + + +class OcrEngine(object): + def __init__(self, service, *args, + **kwargs): + args = list(args) + if type(service) == type: + args.insert(0, service) + service = "custom" + func = getattr(self, "init_{}".format(service)) + func(*args, **kwargs) + def init_paddleocr(self, *args, **kwargs): + from paddleocr import PaddleOCR + self._service = PaddleOCR(*args, **kwargs) + self._ocr = self.ocr_paddleocr + def init_easyocr(self, *args, **kwargs): + from easyocr import Reader + self._service = Reader(*args, **kwargs) + self._ocr = self.ocr_easyocr + def init_custom(self, service, *args, **kwargs): + self._service = service(*args, **kwargs) + self._ocr = self.ocr_custom + def ocr_custom(self, image): + result = self._service.ocr(image) + return result + def ocr_paddleocr(self, image): + r = self._service.ocr(image) + n = bool(r and r[0] and type(r[0][-1])==float) + result = (r if n else r[0]) or [] + output = [[n[0], n[1][0], n[1][1]] for n in result] + return output + def ocr_easyocr(self, image): + result = self._service.readtext(image) + return result + def ocr(self, screenshot): + img = screenshot.getvalue() + result = self._ocr(img) or [] + output = [self.format(*n) for n in result] + return output + def format(self, box, text, confidence): + bound = Bound() + bound.left = int(min(p[0] for p in box)) + bound.top = int(min(p[1] for p in box)) + bound.bottom = int(max(p[1] for p in box)) + bound.right = int(max(p[0] for p in box)) + info = dict(text=text, confidence=confidence, + bound=bound) + return info + + class Device(object): def __init__(self, host, port=65000, certificate=None, @@ -1827,6 +1991,8 @@ def __init__(self, host, port=65000, interceptors = [ClientSessionMetadataInterceptor(session), GrpcRemoteExceptionInterceptor(), ClientLoggingInterceptor()] + self._ocr = None + self._ocr_img_quality = 75 self.channel = grpc.intercept_channel(self._chan, *interceptors) self.session = session @@ -1897,6 +2063,8 @@ def get_last_activities(self, count=3): return self.stub("Application").get_last_activities(count=count) def start_activity(self, **activity): return self.stub("Application").start_activity(**activity) + def get_application_by_name(self, name): + return self.stub("Application").get_application_by_name(name) def application(self, applicationId, user=0): return self.stub("Application")(applicationId, user=user) # 快速调用: Util @@ -2045,6 +2213,26 @@ def device_info(self): return self.stub("UiAutomator").device_info() def __call__(self, **kwargs): return self.stub("UiAutomator")(**kwargs) + # OCR 功能扩展 + def ocr(self, index=0, **kwargs): + if not isinstance(self._ocr, OcrEngine): + raise IllegalStateException("Ocr engine is not setted up") + if any(r not in ["text", "textContains", "textMatches"] \ + for r in kwargs.keys()): + raise InvalidArgumentError("Only text* matches are supported") + if len(kwargs) != 1: + raise InvalidArgumentError("Only or at least one rule can be used") + image = self.screenshot(self._ocr_img_quality) + return OcrOperator(self, + elements=self._ocr.ocr(image), + index=index, + **kwargs + ) + def setup_ocr_backend(self, service, *args, quality=75, + **kwargs): + self._ocr_img_quality = quality + self._ocr = OcrEngine(service, *args, + **kwargs) # 日志打印 def set_debug_log_enabled(self, enable): level = logging.DEBUG if enable else logging.WARN diff --git a/lamda/rpc/services.proto b/lamda/rpc/services.proto index 8317d36..0f5bc61 100644 --- a/lamda/rpc/services.proto +++ b/lamda/rpc/services.proto @@ -49,6 +49,7 @@ service Application { rpc addToDozeModeWhiteList(ApplicationRequest) returns (Boolean) {} rpc removeFromDozeModeWhiteList(ApplicationRequest) returns (Boolean) {} + rpc getIdentifierByLabel(String) returns (String) {} } service Debug { diff --git a/setup.py b/setup.py index b6267d0..9950c94 100644 --- a/setup.py +++ b/setup.py @@ -19,9 +19,9 @@ ], }, install_requires= [ - "grpcio-tools>=1.35.0,<1.60.0", - "grpc-interceptor>=0.13.0,<=0.15.2", - "grpcio>=1.35.0,<1.60.0", + "grpcio-tools>=1.35.0,<=1.68.0", + "grpc-interceptor>=0.13.0,<=0.15.4", + "grpcio>=1.35.0,<=1.68.0", "cryptography>=35.0.0", "msgpack>=1.0.0", "asn1crypto>=1.0.0,<2",