From 82ef1f958b6d0d5a88150db282c12224b2e603d1 Mon Sep 17 00:00:00 2001 From: codeskyblue Date: Wed, 22 May 2024 15:37:58 +0800 Subject: [PATCH] remove xpath useless method (#976) * add RPCStackOverflowError * remove useless code, fix find_last_match * add support AND OR for xpath * fix long_click, use version_compat to check apk version * add more tests --- XPATH.md | 107 +++++---- docs/2to3.md | 11 +- mobile_tests/test_utils.py | 11 +- mobile_tests/test_xpath.py | 20 +- tests/test_xpath.py | 40 +++- uiautomator2/__init__.py | 20 +- uiautomator2/abstract.py | 21 +- uiautomator2/assets/sync.sh | 7 +- uiautomator2/core.py | 37 ++-- uiautomator2/exceptions.py | 1 + uiautomator2/utils.py | 26 +++ uiautomator2/version.py | 10 +- uiautomator2/watcher.py | 20 +- uiautomator2/xpath.py | 419 ++++++++++++++++-------------------- 14 files changed, 396 insertions(+), 354 deletions(-) diff --git a/XPATH.md b/XPATH.md index 32cef810..532e97f7 100644 --- a/XPATH.md +++ b/XPATH.md @@ -31,7 +31,6 @@ pip3 install -U uiautomator2 ``` ## 使用方法 -目前该插件已经内置到uiautomator2中了,所以不需要plugin注册了。 ### 简单用法 @@ -73,7 +72,7 @@ sl = d.xpath("@com.example:id/home_searchedit") # sl为XPathSelector对象 # 点击 sl.click() -sl.click(timeout=10) # 指定超时时间 +sl.click(timeout=10) # 指定超时时间, 找不到抛出异常 XPathElementNotFoundError sl.click_exists() # 存在即点击,返回是否点击成功 sl.click_exists(timeout=10) # 等待最多10s钟 @@ -94,7 +93,7 @@ el = sl.get(timeout=15) # 修改默认的等待时间为15s d.xpath.global_set("timeout", 15) -d.xpath.implicitly_wait(15) # 与上一行代码等价 +d.xpath.implicitly_wait(15) # 与上一行代码等价 (TODO: Removed) print(sl.exists) # 返回是否存在 (bool) sl.get_last_match() # 获取上次匹配的XMLElement @@ -111,15 +110,25 @@ for el in d.xpath('//android.widget.EditText').all(): print(el.elem) # 输出lxml解析出来的Node print(el.text) -# 尚未测试的方法 -# 点击位于控件包含坐标(50%, 50%)的方法 -d.xpath("//*").position(0.5, 0.5).click() - # child操作 d.xpath('@android:id/list').child('/android.widget.TextView').click() 等价于 d.xpath('//*[@resource-id="android:id/list"]/android.widget.TextView').all() ``` +高级查找语法 + +> Added in version 3.1 + +```python +# 查找 text=NFC AND id=android:id/item +(d.xpath("NFC") & d.xpath("@android:id/item")).get() + +# 查找 text=NFC OR id=android:id/item +(d.xpath("NFC") | d.xpath("App") | d.xpath("Content")).get() + +# 复杂一点也支持 +((d.xpath("NFC") | d.xpath("@android:id/item")) & d.xpath("//android.widget.TextView")).get() + ### `XMLElement`的操作 ```python @@ -246,6 +255,40 @@ def main(): print("还可以继续滚动") ``` +### `PageSource`对象 +> Added in version 3.1 + +这个属于高级用法,但是这个对象也最初级,几乎所有的函数都依赖它。 + +什么是PageSource? + +PageSource是从d.dump_hierarchy()的返回值初始化来的。主要用于通过XPATH完成元素的查找工作。 + +用法? + +```python +source = d.xpath.get_page_source() + +# find_elements 是核心方法 +elements = source.find_elements('//android.widget.TextView') # List[XMLElement] +for el in elements: + print(el.text) + +# 获取坐标后点击 +x, y = elements[0].center() +d.click(x, y) + +# 多种条件的查询写法 +es1 = source.find_elements('//android.widget.TextView') +es2 = source.find_elements(XPath('@android:id/content').joinpath("//*")) + +# 寻找是TextView但不属于id=android:id/content下的节点 +els = set(es1) - set(es2) + +# 寻找是TextView同事属于id=android:id/content下的节点 +els = set(es1) & set(es2) +``` + ## XPath规则 为了写起脚本来更快,我们自定义了一些简化的xpath规则 @@ -276,12 +319,6 @@ def main(): `%知道%` 匹配包含`知道`的文本,相当于 `//*[contains(text(), '知道')]` -**~~规则5~~(目前该功能已移除)** - -> 另外来自Selenium PageObjects - -`$知道` 匹配 通过`d.xpath.global_set("alias", dict)` dict字典中的内容, 如果不存在将使用`知道`来匹配 - **规则 Last** 会匹配text 和 description字段 @@ -289,7 +326,7 @@ def main(): 如 `搜索` 相当于 XPath `//*[@text="搜索" or @content-desc="搜索" or @resource-id="搜索"]` ## 特殊说明 -- 有时className中包含有`$`字符,这个字符在XML中是不合法的,所以全部替换成了`-` +- 有时className中包含有`$@#&`字符,这个字符在XML中是不合法的,所以全部替换成了`.` ## XPath的一些高级用法 ``` @@ -317,44 +354,4 @@ def main(): - [XPath的一些高级用法-简书](https://www.jianshu.com/p/4fef4142b33f) - [XPath Quicksheet](https://devhints.io/xpath) -如有其他资料,欢迎提[Issues](https://github.com/openatx/uiautomator2/issues/new)补充 - -## 废弃功能 -**别名定义** 从`1.3.4`版本不再支持别名 - -这种写法有点类似selenium中的[PageObjects](https://selenium-python.readthedocs.io/page-objects.html) - -```python -# 这里是Python3的写法,python2的string定义需要改成 u"菜单" 注意前的这个u -d.xpath.global_set("alias", { - "菜单": "@com.netease.cloudmusic:id/qh", # TODO(ssx): maybe we can support P("@com.netease.cloudmusic:id/qh", wait_timeout=2) someday - "设置": "//android.widget.TextView[@text='设置']", -}) - -# 这里需要 $ 开头 -d.xpath("$菜单").click() # 等价于 d.xpath() -d.xpath("$设置").click() - - -d.xpath("$菜单").click() -# 等价于 d.xpath("@com.netease.cloudmusic:id/qh").click() - -d.xpath("$小吃").click() # 在这里会直接跑出XPathError异常,因为并不存在 小吃 这个alias - -# alias_strict 设置项 -d.xpath.global_set("alias_strict", False) # 默认 True -d.xpath("$小吃").click() # 这里就会正常运行 -# 等价于 -d.xpath('//*[@text="小吃" or @content-desc="小吃"]').click() -``` - -# 调整xpath的日志级别 -目前默认logging.INFO - -调整方法 - -```python -import logging - -d.xpath.logger.setLevel(logging.DEBUG) -``` +如有其他资料,欢迎提[Issues](https://github.com/openatx/uiautomator2/issues/new)补充 \ No newline at end of file diff --git a/docs/2to3.md b/docs/2to3.md index c87f723b..41971ccb 100644 --- a/docs/2to3.md +++ b/docs/2to3.md @@ -16,6 +16,7 @@ ## New - Add function enable_pretty_logging - Add class AdbShellError, HierarchyEmptyError, HTTPError +- Add d.xpath.get_page_source() -> PageSource ## Breaking changes (移除的功能) @@ -47,7 +48,7 @@ ### Function removes - Remove function current_app, use app_current instead -- Remove function XPath.apply_watch_from_yaml +- Remove function d.xpath.apply_watch_from_yaml - Remove function healcheck, 原来是未来恢复uiautomator服务用的 - Remove function service(name: str) -> Service, 原本是用于做atx-agent的服务管理 - Remove function app_icon() -> Image, 该函数依赖atx-agent @@ -56,6 +57,14 @@ - Remove function set_new_command_timeout(timeout: int), 用不着了 - Remove function open_identify(), 打开一个比较明显的界面,这个函数出了点毛病,先下掉了 +XPath (d.xpath) methods +- remove dump_hierarchy +- remove get_last_hierarchy +- remove add_event_listener +- remove send_click, send_longclick, send_swipe, send_text, take_screenshot +- remove when, run_watchers, watch_background, watch_stop, watch_clear, sleep_watch +- remove position method, usage like d.xpath(...).position(0.2, 0.2) + ### Command remove - Remove "uiautomator2 healthcheck" - Remove "uiautomator2 identify" diff --git a/mobile_tests/test_utils.py b/mobile_tests/test_utils.py index e6a5e60d..7569b2f1 100644 --- a/mobile_tests/test_utils.py +++ b/mobile_tests/test_utils.py @@ -52,4 +52,13 @@ def call(self): assert 2 == a.n - +def test_is_version_compatiable(): + assert utils.is_version_compatiable("1.0.0", "1.0.0") + assert utils.is_version_compatiable("1.0.0", "1.0.1") + assert utils.is_version_compatiable("1.0.0", "1.2.0") + assert utils.is_version_compatiable("1.0.1", "1.1.0") + + assert not utils.is_version_compatiable("1.0.1", "2.1.0") + assert not utils.is_version_compatiable("1.3.1", "1.3.0") + assert not utils.is_version_compatiable("1.3.1", "1.2.0") + assert not utils.is_version_compatiable("1.3.1", "1.2.2") diff --git a/mobile_tests/test_xpath.py b/mobile_tests/test_xpath.py index 98b7fa11..858deaa4 100644 --- a/mobile_tests/test_xpath.py +++ b/mobile_tests/test_xpath.py @@ -8,20 +8,6 @@ import uiautomator2 as u2 -def test_xpath_selector(dev: u2.Device): - sel1 = dev.xpath("/a") - print(str(sel1), type(str(sel1))) - assert str(sel1).endswith("=/a") - assert str(sel1.child("/b")).endswith("=/a/b") - assert str(sel1).endswith("=/a") # sel1 should not be changed - assert str(sel1.xpath("/b")).endswith("=/a|/b") - assert str(sel1.xpath(["/b", "/c"])).endswith("=/a|/b|/c") - assert sel1.position(0.1, 0.1) != sel1 - assert sel1.fallback("click") != sel1 - with pytest.raises(ValueError): - sel1.fallback("invalid-action") - - def test_get_text(dev: u2.Device): assert dev.xpath("App").get_text() == "App" @@ -55,15 +41,15 @@ def test_element_all(dev: u2.Device): def test_watcher(dev: u2.Device, request): - dev.xpath.when("App").click() - dev.xpath.watch_background(interval=1.0) + dev.watcher.when("App").click() + dev.watcher.start(interval=1.0) event = threading.Event() def _set_event(e): e.set() - dev.xpath.when("Action Bar").call(partial(_set_event, event)) + dev.watcher.when("Action Bar").call(partial(_set_event, event)) assert event.wait(5.0), "xpath not trigger callback" diff --git a/tests/test_xpath.py b/tests/test_xpath.py index 5a14a618..2353a39c 100644 --- a/tests/test_xpath.py +++ b/tests/test_xpath.py @@ -7,7 +7,7 @@ import pytest from unittest.mock import Mock from PIL import Image -from uiautomator2.xpath import XMLElement, XPathSelector, XPath, XPathElementNotFoundError, is_xpath_syntax_ok +from uiautomator2.xpath import XMLElement, XPath, XPathSelector, XPathEntry, XPathElementNotFoundError, convert_to_camel_case, is_xpath_syntax_ok, safe_xmlstr, str2bytes, strict_xpath mock = Mock() @@ -22,7 +22,21 @@ """ -x = XPath(mock) +x = XPathEntry(mock) + + +def test_safe_xmlstr(): + for input, expect in [ + ('android.widget.TextView', 'android.widget.TextView'), + ('test$123', 'test.123'), + ('$@#&123.456$', '123.456'), + ]: + assert safe_xmlstr(input) == expect + + +def test_str2bytes(): + assert str2bytes(b'123') == b'123' + assert str2bytes('123') == b'123' def test_is_xpath_syntax_ok(): @@ -32,6 +46,28 @@ def test_is_xpath_syntax_ok(): assert is_xpath_syntax_ok("//a[") is False +def test_convert_to_camel_case(): + assert convert_to_camel_case("hello-world") == "helloWorld" + + +def test_strict_xpath(): + for (input, expect) in [ + ("@n1", "//*[@resource-id='n1']"), + ("//TextView", "//TextView"), + ("//TextView[@text='n1']", "//TextView[@text='n1']"), + ("(//TextView)[2]", "(//TextView)[2]"), + ("//TextView/", "//TextView"), # test rstrip / + ]: + assert strict_xpath(input) == expect + + +def test_XPath(): + xp = XPath("//TextView") + assert xp == "//TextView" + assert xp.joinpath("/n1") == "//TextView/n1" + + + def test_xpath_selector(): assert isinstance(x("n1"), XPathSelector) assert isinstance(x("//TextView"), XPathSelector) diff --git a/uiautomator2/__init__.py b/uiautomator2/__init__.py index f62f3706..dd64d810 100644 --- a/uiautomator2/__init__.py +++ b/uiautomator2/__init__.py @@ -24,7 +24,7 @@ from uiautomator2 import xpath from uiautomator2._proto import HTTP_TIMEOUT, SCROLL_STEPS, Direction from uiautomator2._selector import Selector, UiObject -from uiautomator2.exceptions import AdbShellError, BaseException, DeviceError, HierarchyEmptyError, SessionBrokenError +from uiautomator2.exceptions import AdbShellError, BaseException, ConnectError, DeviceError, HierarchyEmptyError, SessionBrokenError from uiautomator2.settings import Settings from uiautomator2.swipe import SwipeExt from uiautomator2.utils import list2cmdline @@ -71,7 +71,10 @@ def _wait_for_device(self, timeout=10) -> adbutils.AdbDevice: wait for device came online, if device is remote, reconnect every 1s Returns: - adbutils.AdbDevice or None + adbutils.AdbDevice + + Raises: + ConnectError """ for d in adbutils.adb.device_list(): if d.serial == self._serial: @@ -98,7 +101,7 @@ def _wait_for_device(self, timeout=10) -> adbutils.AdbDevice: except (adbutils.AdbError, adbutils.AdbTimeout): continue return adb.device(self._serial) - return None + raise ConnectError(f"device {self._serial} not online") @property def adb_device(self) -> adbutils.AdbDevice: @@ -406,12 +409,13 @@ def double_click(self, x, y, duration=0.1): def long_click(self, x, y, duration: float = .5): '''long click at arbitrary coordinates. + Args: duration (float): seconds of pressed ''' x, y = self.pos_rel2abs(x, y) with self._operation_delay("click"): - return self.touch.down(x, y).sleep(duration).up(x, y) + self.jsonrpc.click(x, y, int(duration*1000)) def swipe(self, fx, fy, tx, ty, duration: Optional[float] = None, steps: Optional[int] = None): """ @@ -901,9 +905,9 @@ def make_toast(self, text, duration=1.0): return self.jsonrpc.makeToast(text, duration * 1000) def unlock(self): - """ unlock screen """ + """ unlock screen with swipe from left-bottom to right-top """ if not self.info['screenOn']: - self.press("power") + self.shell("input keyevent WAKEUP") self.swipe(0.1, 0.9, 0.9, 0.1) @@ -1033,8 +1037,8 @@ def watcher(self) -> Watcher: return Watcher(self) @cached_property - def xpath(self) -> xpath.XPath: - return xpath.XPath(self) + def xpath(self) -> xpath.XPathEntry: + return xpath.XPathEntry(self) @cached_property def image(self): diff --git a/uiautomator2/abstract.py b/uiautomator2/abstract.py index 6e9fe03f..867a9977 100644 --- a/uiautomator2/abstract.py +++ b/uiautomator2/abstract.py @@ -5,9 +5,10 @@ """ import abc -from typing import Any, List, NamedTuple, Union +from typing import Any, List, NamedTuple, Tuple, Union import adbutils - +from PIL import Image +from uiautomator2._proto import Direction class ShellResponse(NamedTuple): @@ -46,12 +47,24 @@ class AbstractXPathBasedDevice(metaclass=abc.ABCMeta): def click(self, x: int, y: int): pass + @abc.abstractmethod + def long_click(self, x: int, y: int): + pass + + @abc.abstractmethod + def send_keys(self, text: str): + pass + @abc.abstractmethod def swipe(self, fx: int, fy: int, tx: int, ty: int, duration: float): """ duration is float type, indicate seconds """ @abc.abstractmethod - def window_size(self) -> tuple: + def swipe_ext(self, direction: Direction, scale: float): + pass + + @abc.abstractmethod + def window_size(self) -> Tuple[int, int]: """ return (width, height) """ @abc.abstractmethod @@ -59,5 +72,5 @@ def dump_hierarchy(self) -> str: """ return xml content """ @abc.abstractmethod - def screenshot(self): + def screenshot(self) -> Image.Image: """ return PIL.Image.Image """ diff --git a/uiautomator2/assets/sync.sh b/uiautomator2/assets/sync.sh index 80fc842d..7cacf10d 100755 --- a/uiautomator2/assets/sync.sh +++ b/uiautomator2/assets/sync.sh @@ -3,8 +3,8 @@ set -e -APK_VERSION="2.3.11" -# AGENT_VERSION="0.10.1" +APK_VERSION=$(cat ../version.py| grep apk_version | awk '{print $NF}') +APK_VERSION=${APK_VERSION//[\"\']} cd "$(dirname $0)" @@ -30,7 +30,8 @@ function download_apk(){ # download_atx_agent "$AGENT_VERSION" # echo "atx_agent_version: $AGENT_VERSION" >> version.txt +echo "APK_VERSION: $APK_VERSION" + download_apk "$APK_VERSION" "app-uiautomator.apk" download_apk "$APK_VERSION" "app-uiautomator-test.apk" - echo "apk_version: $APK_VERSION" > version.txt diff --git a/uiautomator2/core.py b/uiautomator2/core.py index 047f8e70..5d7446ce 100644 --- a/uiautomator2/core.py +++ b/uiautomator2/core.py @@ -16,8 +16,10 @@ import adbutils import requests -from uiautomator2.exceptions import RPCInvalidError, UiAutomationNotConnectedError, HTTPError, LaunchUiAutomationError, UiObjectNotFoundError, RPCUnknownError, APKSignatureError, AccessibilityServiceAlreadyRegisteredError +from uiautomator2.exceptions import RPCInvalidError, RPCStackOverflowError, UiAutomationNotConnectedError, HTTPError, LaunchUiAutomationError, UiObjectNotFoundError, RPCUnknownError, APKSignatureError, AccessibilityServiceAlreadyRegisteredError from uiautomator2.abstract import AbstractUiautomatorServer +from uiautomator2.utils import is_version_compatiable +from uiautomator2.version import __apk_version__ logger = logging.getLogger(__name__) @@ -138,6 +140,8 @@ def _jsonrpc_call(dev: adbutils.AdbDevice, method: str, params: Any, timeout: fl raise UiAutomationNotConnectedError("android.os.DeadObjectException") if "uiautomator.UiObjectNotFoundException" in message: raise UiObjectNotFoundError(code, message, params) + if "java.lang.StackOverflowError" in message: + raise RPCStackOverflowError(f"StackOverflowError: {message}", params, stacktrace[:1000] + "..." + stacktrace[-1000:]) raise RPCUnknownError(f"Unknown RPC error: {code} {message}", params, stacktrace) if "result" not in data: @@ -155,7 +159,7 @@ def __init__(self, dev: adbutils.AdbDevice) -> None: self._lock = threading.Lock() self._debug = False self.start_uiautomator() - atexit.register(self.stop_uiautomator) + atexit.register(self.stop_uiautomator, wait=False) @property def debug(self) -> bool: @@ -189,30 +193,24 @@ def _do_start_uiautomator(self): if not self._check_alive(): self._process = launch_uiautomator(self._dev) self._wait_ready() - + def _setup_apks(self): assets_dir = Path(__file__).parent / "assets" main_apk = assets_dir / "app-uiautomator.apk" test_apk = assets_dir / "app-uiautomator-test.apk" - # get apk version - version_text = assets_dir / "version.txt" - apk_version = None - for line in version_text.read_text('utf-8').splitlines(): - k, v = line.split(":") - if k == "apk_version": - apk_version = v.strip() - break - logger.debug("use apk_version: %s", apk_version) + logger.debug("use apk_version: %s", __apk_version__) # install apk when not installed or version not match, dev version always keep main_apk_info = self._dev.app_info("com.github.uiautomator") if main_apk_info is None: self._install_apk(main_apk) - elif main_apk_info.version_name != apk_version: + elif main_apk_info.version_name != __apk_version__: if "dev" in main_apk_info.version_name or "dirty" in main_apk_info.version_name: logger.debug("skip version check for %s", main_apk_info.version_name) + elif is_version_compatiable(__apk_version__, main_apk_info.version_name): + logger.debug("apk version compatiable, expect %s, actual %s", __apk_version__, main_apk_info.version_name) else: - logger.debug("apk version not match, reinstall") + logger.debug("apk version not ok, expect %s, actual %s", __apk_version__, main_apk_info.version_name) self._dev.uninstall("com.github.uiautomator") self._dev.uninstall("com.github.uiautomator.test") self._install_apk(main_apk) @@ -279,12 +277,20 @@ def _wait_ready(self, launch_timeout=30, service_timeout=30): self._dev.shell("am startservice -a com.github.uiautomator.ACTION_START") self._dev.shell("am start -n com.github.uiautomator/.ToastActivity -e showFloatWindow true") self._wait_stub_ready(service_timeout) + time.sleep(1) # wait ATX goto background - def stop_uiautomator(self): + def stop_uiautomator(self, wait=True): with self._lock: if self._process: self._process.kill() self._process = None + # wait server quit + if wait: + deadline = time.time() + 10 + while time.time() < deadline: + if not self._check_alive(): + return + time.sleep(.5) def jsonrpc_call(self, method: str, params: Any = None, timeout: float = 10) -> Any: """Send jsonrpc call to uiautomator2 server""" @@ -292,6 +298,7 @@ def jsonrpc_call(self, method: str, params: Any = None, timeout: float = 10) -> return _jsonrpc_call(self._dev, method, params, timeout, self._debug) except (HTTPError, UiAutomationNotConnectedError) as e: logger.debug("uiautomator2 is not ok, error: %s", e) + self.stop_uiautomator() self.start_uiautomator() return _jsonrpc_call(self._dev, method, params, timeout, self._debug) diff --git a/uiautomator2/exceptions.py b/uiautomator2/exceptions.py index 2a8c741d..c0fdfdc8 100644 --- a/uiautomator2/exceptions.py +++ b/uiautomator2/exceptions.py @@ -30,6 +30,7 @@ class RPCError(BaseException): class RPCUnknownError(RPCError):... class RPCInvalidError(RPCError):... class HierarchyEmptyError(RPCError):... +class RPCStackOverflowError(RPCError):... class NormalError(RPCError): diff --git a/uiautomator2/utils.py b/uiautomator2/utils.py index 5b6caf9d..a0d6130a 100644 --- a/uiautomator2/utils.py +++ b/uiautomator2/utils.py @@ -209,6 +209,32 @@ def inner(self, *args, **kwargs): return inner + +def is_version_compatiable(expect_version: str, actual_version: str) -> bool: + """ + Check if the actual version is compatiable with the expect version + + Args: + expect_version: expect version, e.g. 1.0.0 + actual_version: actual version, e.g. 1.0.0 + + Returns: + bool: True if compatiable, otherwise False + """ + def _parse_version(version: str): + return tuple(map(int, version.split("."))) + + evs = _parse_version(expect_version) + avs = _parse_version(actual_version) + assert len(evs) == len(avs) == 3, "version format error" + if evs[0] == avs[0]: + if evs[1] < avs[1]: + return True + if evs[1] == avs[1]: + return evs[2] <= avs[2] + return False + + if __name__ == "__main__": for n in (1, 10000, 10000000, 10000000000): print(n, natualsize(n)) diff --git a/uiautomator2/version.py b/uiautomator2/version.py index 231a24da..b7bd9470 100644 --- a/uiautomator2/version.py +++ b/uiautomator2/version.py @@ -2,11 +2,13 @@ # # version managed by poetry -__version__ = "0.0.0" +__version__ = '0.0.0' -# See ChangeLog for details -# __apk_version__ = '2.3.3' +# see release note for details +__apk_version__ = '2.3.11' + +# old apk version history # 2.3.3 make float windows smaller # 2.3.2 merge pull requests # require atx-agent>=0.10.0 # 2.3.1 support minicapagent, rotationagent, minitouchagent @@ -39,7 +41,7 @@ # __jar_version__ = 'v0.1.6' # no useless for now. # v0.1.6 first release version -__atx_agent_version__ = '0.10.1' # sync.sh verison should also be updated +# __atx_agent_version__ = '0.10.1' # sync.sh verison should also be updated # 0.10.1 update androidbinary version, https://github.com/openatx/atx-agent/issues/115 # 0.10.0 remove tunnel code, use androidx.test.runner # 0.9.6 fix security reason for remote control device diff --git a/uiautomator2/watcher.py b/uiautomator2/watcher.py index c06faa2e..aec1aec1 100644 --- a/uiautomator2/watcher.py +++ b/uiautomator2/watcher.py @@ -7,10 +7,10 @@ import time import typing from collections import OrderedDict -from typing import Optional +from typing import List, Optional import uiautomator2 -from uiautomator2.xpath import XPath +from uiautomator2.xpath import PageSource, XPathEntry, XPathSelector from uiautomator2.utils import inject_call logger = logging.getLogger(__name__) @@ -91,7 +91,7 @@ def _run(self) -> bool: ok = True last_match = None for xpath in xpaths: - sel = self._d.xpath(xpath, source=source) + sel: XPathSelector = self._d.xpath(xpath, source=source) if not sel.exists: ok = False break @@ -152,7 +152,7 @@ def __init__(self, d: "uiautomator2.Device"): self._triggering = False @property - def _xpath(self) -> XPath: + def _xpath(self) -> XPathEntry: return self._d.xpath def _dump_hierarchy(self): @@ -213,7 +213,7 @@ def _watch_forever(self, interval: float): finally: self._watch_stop_event.set() - def run(self, source: Optional[str] = None): + def run(self, source: Optional[PageSource] = None): """ run watchers Args: source: hierarchy content @@ -231,7 +231,7 @@ def _run_watchers(self, source=None) -> bool: Returns: bool (watched or not) """ - source = source or self._dump_hierarchy() + source = source or self._xpath.get_page_source() for h in self._watchers: last_selector = None @@ -284,13 +284,13 @@ class XPathWatcher(): def __init__(self, parent: Watcher, xpath: str, name: str = ''): self._name = name self._parent = parent - self._xpath_list = [xpath] if xpath else [] + self._xpath_list: List[str] = [xpath] if xpath else [] - def when(self, xpath=None): + def when(self, xpath: str = None): self._xpath_list.append(xpath) return self - def call(self, func): + def call(self, func: callable): """ func accept argument, key(d, el) d=self._d, el=element @@ -302,7 +302,7 @@ def call(self, func): }) def click(self): - def _inner_click(selector): + def _inner_click(selector: XPathSelector): selector.get_last_match().click() self.call(_inner_click) diff --git a/uiautomator2/xpath.py b/uiautomator2/xpath.py index 80a76b3d..9b8e958f 100644 --- a/uiautomator2/xpath.py +++ b/uiautomator2/xpath.py @@ -3,13 +3,14 @@ from __future__ import absolute_import +import abc +import copy +import enum import functools import logging import re -import threading import time -from collections import defaultdict -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union from deprecated import deprecated from PIL import Image @@ -23,8 +24,21 @@ logger = logging.getLogger(__name__) + +class TimeoutException(Exception): + pass + + +class XPathError(Exception): + """basic error for xpath plugin""" + + + def safe_xmlstr(s: str) -> str: - return s.replace("$", "-") + s = re.sub('[$@#&]', '.', s) + s = re.sub('\\.+', '.', s) + s = re.sub('^\\.|\\.$', '', s) + return s def string_quote(s: str) -> str: @@ -50,8 +64,8 @@ def convert_to_camel_case(s: str) -> str: """ Convert a string from kebab-case to camelCase. - :param s: A string in kebab-case format. - :return: A string converted to camelCase format. + Example: + "hello-world" -> "helloWorld" """ parts = s.split('-') # Convert the first letter of each part to uppercase, except for the first part @@ -63,7 +77,7 @@ def strict_xpath(xpath: str) -> str: """make xpath to be computer recognized xpath""" orig_xpath = xpath - if xpath.startswith("/") or xpath.startswith("(/"): + if xpath.lstrip("(").startswith("/"): pass elif xpath.startswith("@"): xpath = "//*[@resource-id={!r}]".format(xpath[1:]) @@ -71,9 +85,6 @@ def strict_xpath(xpath: str) -> str: xpath = "//*[re:match(@text, {0}) or re:match(@content-desc, {0}) or re:match(@resource-id, {0})]".format( string_quote(xpath) ) - # elif xpath.startswith("$"): # special for objects - # key = xpath[1:] - # return self(self.__alias_get(key), source) elif xpath.startswith("%") and xpath.endswith("%"): xpath = "//*[contains(@text, {0}) or contains(@content-desc, {0})]".format( string_quote(xpath[1:-1]) @@ -95,20 +106,60 @@ def strict_xpath(xpath: str) -> str: string_quote(xpath) ) + xpath = xpath.rstrip("/") if not is_xpath_syntax_ok(xpath): raise XPathError("Invalid xpath", orig_xpath) logger.debug("xpath %s -> %s", orig_xpath, xpath) return xpath -class TimeoutException(Exception): - pass +class XPath(str): + def __new__(cls, value, *args): + if isinstance(value, XPath): + return value + xpath = strict_xpath(value) + if args: + return functools.reduce(lambda a, b: a.joinpath(b), args, XPath(xpath)) + else: + return super().__new__(cls, xpath) + + def __repr__(self): + return f'XPath({super().__repr__()})' + + def __and__(self, value: 'XPath') -> 'XPathSelector': + raise NotImplementedError + + def joinpath(self, subpath: str) -> "XPath": + if not subpath.startswith('/'): + subpath = '/' + subpath + return XPath(self + subpath) + + +class PageSource: + def __init__(self, xml_content: str): + self._xml_content = xml_content + + @staticmethod + def parse(data: Optional[Union[str, "PageSource"]]) -> Optional["PageSource"]: + if not data: + return None + if isinstance(data, str): + return PageSource(data) + return data + + @functools.cached_property + def root(self) -> etree._Element: + _root = etree.fromstring(str2bytes(self._xml_content)) + for node in _root.xpath("//node"): + node.tag = safe_xmlstr(node.attrib.pop("class", "")) or "node" + return _root + def find_elements(self, xpath: Union[str, XPath]) -> List["XMLElement"]: + matches = self.root.xpath(xpath, namespaces={"re": "http://exslt.org/regular-expressions"}) + return [XMLElement(node) for node in matches] -class XPathError(Exception): - """basic error for xpath plugin""" -class XPath(object): +class XPathEntry(object): def __init__(self, d: AbstractXPathBasedDevice): """ Args: @@ -118,23 +169,9 @@ def __init__(self, d: AbstractXPathBasedDevice): assert hasattr(d, "wait_timeout") # TODO: remove wait_timeout - self._click_before_delay = 0.0 # pre delay - self._click_after_delay = None # post delay - self._last_source = None - self._event_callbacks = defaultdict(list) - - # used for click("#back") and back is the key - self._alias = {} - self._alias_strict = False - self._dump_lock = threading.Lock() - def global_set(self, key, value): valid_keys = { "timeout", - "alias", - "alias_strict", - "click_after_delay", - "click_before_delay", } if key not in valid_keys: raise ValueError("invalid key", key) @@ -155,52 +192,8 @@ def wait_timeout(self): def _watcher(self): return self._d.watcher - def dump_hierarchy(self): - with self._dump_lock: - self._last_source = self._d.dump_hierarchy() - return self._last_source - - def get_last_hierarchy(self): - return self._last_source - - def add_event_listener(self, event_name, callback): - self._event_callbacks[event_name] += [callback] - - # def register_callback(action: str, callback): - # pass - - def send_click(self, x, y): - if self._click_before_delay: - logger.debug( - "click before delay %.1f seconds", self._click_after_delay - ) - time.sleep(self._click_before_delay) - - # TODO(ssx): should use a better way - # event callbacks for report generate - for callback_func in self._event_callbacks["send_click"]: - callback_func(x, y) - - self._d.click(x, y) - - if self._click_after_delay: - logger.debug("click after delay %.1f seconds", self._click_after_delay) - time.sleep(self._click_after_delay) - - def send_longclick(self, x, y): - self._d.long_click(x, y) - - def send_swipe(self, sx, sy, tx, ty): - self._d.swipe(sx, sy, tx, ty) - - def send_text(self, text: str = None): - self._d.set_fastinput_ime() - self._d.clear_text() - if text: - self._d.send_keys(text) - - def take_screenshot(self) -> Image.Image: - return self._d.screenshot() + def get_page_source(self) -> PageSource: + return PageSource.parse(self._d.dump_hierarchy()) def match(self, xpath, source=None): return len(self(xpath, source).all()) > 0 @@ -235,26 +228,7 @@ def sleep_watch(self, seconds): left_time = max(0, deadline - time.time()) time.sleep(min(0.5, left_time)) - def _get_after_watch(self, xpath: Union[str, list], timeout=None): - if timeout == 0: - timeout = 0.01 - timeout = timeout or self.wait_timeout - logger.info("XPath(timeout %.1f) %s", timeout, xpath) - deadline = time.time() + timeout - while True: - source = self.dump_hierarchy() - - selector = self(xpath, source) - if selector.exists: - return selector.get_last_match() - - if time.time() > deadline: - break - time.sleep(0.5) - - raise TimeoutException("timeout %.1f, xpath: %s" % (timeout, xpath)) - - def click(self, xpath: Union[str, list], timeout=None, pre_delay: float = None): + def click(self, xpath: Union[str, list], timeout: float=None): """ Find element and perform click @@ -266,8 +240,8 @@ def click(self, xpath: Union[str, list], timeout=None, pre_delay: float = None): Raises: TimeoutException """ - el = self._get_after_watch(xpath, timeout) - el.click() # 100ms + selector = XPathSelector(xpath, self) + selector.click(timeout=timeout) def scroll_to( self, @@ -301,87 +275,94 @@ def scroll_to( self._d.swipe_ext(direction, 0.5) return False - def __alias_get(self, key, default=None): - """ - when alias_strict set, if key not in _alias, XPathError will be raised - """ - value = self._alias.get(key, default) - if value is None: - if self._alias_strict: - raise XPathError("alias have not found key", key) - value = key - return value + def __call__(self, xpath: str, source: Union[str, PageSource] = None) -> "XPathSelector": + return XPathSelector(xpath, self, PageSource.parse(source)) - def __call__(self, xpath: str, source=None): - return XPathSelector(self, xpath, source) +class Operator(str, enum.Enum): + AND = 'AND' + OR = 'OR' -class XPathSelector(object): - def __init__(self, parent: XPath, xpath: str = None, source: str = None): - self._parent = parent - self._d = parent._d - self._source = source - self._last_source = None - self._position = None - self._fallback = None - self._xpath_list = (strict_xpath(xpath),) if xpath else () - def __str__(self): - return f"XPathSelector={'|'.join(self._xpath_list)}" +class AbstractSelector(abc.ABC): + @abc.abstractmethod + def all(self, source: PageSource) -> List['XMLElement']: + pass + + +class XPathSelector(AbstractSelector): + def __init__(self, xpath: Union[str, XPath, AbstractSelector], parent: XPathEntry = None, source: Optional[PageSource] = None): + self._base_xpath = XPath(xpath) if isinstance(xpath, str) else xpath + self._operator: Operator = None + self._next_xpath: AbstractSelector = None + self._parent = parent + self._source = source + self._last_source: Optional[PageSource] = None + self._fallback: callable = None + def copy(self) -> "XPathSelector": """copy self""" - new = XPathSelector(self._parent) - new._source = self._source - new._last_source = self._last_source - new._position = self._position - new._fallback = self._fallback - new._xpath_list = self._xpath_list - return new + return copy.copy(self) + + @classmethod + def create(cls, value: Union[str, XPath, 'XPathSelector']) -> 'XPathSelector': + if isinstance(value, XPathSelector): + return value.copy() + elif isinstance(value, (str, XPath)): + return XPathSelector(XPath(value)) + else: + raise ValueError('Invalid value', value) - def xpath(self, _xpath: Union[list, tuple, str]): + def __repr__(self): + if self._operator: + return f'XPathSelector({repr(self._base_xpath)} {self._operator.value} {repr(self._next_xpath)})' + else: + return f'XPathSelector({repr(self._base_xpath)})' + + def __and__(self, value) -> 'XPathSelector': + s = XPathSelector(self) + s._next_xpath = XPathSelector.create(value) + s._operator = Operator.AND + s._parent = self._parent + return s + + def __or__(self, value) -> 'XPathSelector': + s = XPathSelector(self) + s._next_xpath = XPathSelector.create(value) + s._operator = Operator.OR + s._parent = self._parent + return s + + def xpath(self, _xpath: Union[list, tuple, str]) -> 'XPathSelector': """ add xpath to condition list the element should match all conditions + + Deprecated, using a & b instead """ - new = self.copy() if isinstance(_xpath, (list, tuple)): - for xp in _xpath: - new = new.xpath(xp) + return functools.reduce(lambda a, b: a & b, _xpath, self) else: - new._xpath_list = new._xpath_list + (strict_xpath(_xpath),) - return new + return self & _xpath def child(self, _xpath: str) -> "XPathSelector": """ add child xpath """ - if not _xpath.startswith("/"): - _xpath = "/" + _xpath + if self._operator or not isinstance(self._base_xpath, XPath): + raise XPathError("can't use child when base is not XPath or operator is set") new = self.copy() - new._xpath_list = new._xpath_list[:-1] + (new._xpath_list[-1] + _xpath,) - return new - - def position(self, x: float, y: float): - """set possible position""" - assert 0 < x < 1 - assert 0 < y < 1 - new = self.copy() - new._position = (x, y) + new._base_xpath = self._base_xpath.joinpath(_xpath) return new def fallback(self, func: Optional[Callable[..., bool]] = None, *args, **kwargs): """ callback on failure """ - if isinstance(func, str): - if func == "click": - if len(args) == 0: - args = self._position - func = lambda d: d.click(*args) - else: - raise ValueError('func should be "click" or callable function') - + if not callable(func): + raise ValueError('func should be "click" or callable function') + assert callable(func) new = self.copy() new._fallback = func @@ -393,69 +374,43 @@ def _global_timeout(self) -> float: return self._parent.wait_timeout return 20.0 - def all(self, source=None) -> List["XMLElement"]: + def _get_page_source(self) -> PageSource: + if self._source: + return self._source + if not self._parent: + raise XPathError("self._parent is not set") + return self._parent.get_page_source() + + def all(self, source: PageSource=None) -> List["XMLElement"]: """find all matched elements""" - xml_content = source or self._source or self._parent.dump_hierarchy() - self._last_source = xml_content - - # run-watchers - hierarchy = source or self._source - if not hierarchy: - trigger_count = 0 - for _ in range(5): # trigger for most 5 times - triggered = self._parent._watcher.run(xml_content) - if not triggered: - break - trigger_count += 1 - xml_content = self._parent.dump_hierarchy() - if trigger_count: - logger.debug("watcher triggered %d times", trigger_count) - - if hierarchy is None: - root = etree.fromstring(str2bytes(xml_content)) - elif isinstance(hierarchy, (str, bytes)): - root = etree.fromstring(str2bytes(hierarchy)) - elif isinstance(hierarchy, etree._Element): - root = hierarchy - else: - raise TypeError("Unknown type", type(hierarchy)) - - for node in root.xpath("//node"): - node.tag = safe_xmlstr(node.attrib.pop("class", "")) or "node" + if not source: + source = self._get_page_source() + self._last_source = source - match_sets = [] - for xpath in self._xpath_list: - matches = root.xpath( - xpath, namespaces={"re": "http://exslt.org/regular-expressions"} - ) - match_sets.append(matches) - # find out nodes which match all xpaths - match_nodes = functools.reduce(lambda x, y: set(x).intersection(y), match_sets) - els = [XMLElement(node, self._parent) for node in match_nodes] - if not self._position: - return els - - # 中心点应控制在控件内 - inside_els = [] - px, py = self._position - wsize = self._d.window_size() - for e in els: - lpx, lpy, rpx, rpy = e.percent_bounds(wsize=wsize) - # 中心点偏移百分比不应大于控件宽高的50% - scale = 1.5 - - if abs(px - (lpx + rpx) / 2) > (rpx - lpx) * 0.5 * scale: - continue - if abs(py - (lpy + rpy) / 2) > (rpy - lpy) * 0.5 * scale: - continue - inside_els.append(e) - return inside_els + elements = [] + if isinstance(self._base_xpath, XPath): + elements = source.find_elements(self._base_xpath) + else: + elements = self._base_xpath.all(source) + + # AND OR + if self._next_xpath and self._operator: + next_els = self._next_xpath.all(source) + if self._operator == Operator.AND: + elements = list(set(elements) & set(next_els)) + elif self._operator == Operator.OR: + elements = list(set(elements) | set(next_els)) + else: + raise ValueError("Invalid operator", self._operator) + for el in elements: + el._parent = self._parent + return elements @property - def exists(self): + def exists(self) -> bool: return len(self.all()) > 0 - def get(self, timeout=None): + def get(self, timeout=None) -> "XMLElement": """ Get first matched element @@ -469,10 +424,10 @@ def get(self, timeout=None): XPathElementNotFoundError """ if not self.wait(timeout or self._global_timeout): - raise XPathElementNotFoundError(self._xpath_list) + raise XPathElementNotFoundError(self) return self.get_last_match() - def get_last_match(self): + def get_last_match(self) -> "XMLElement": return self.all(self._last_source)[0] def get_text(self) -> Optional[str]: @@ -487,11 +442,10 @@ def get_text(self) -> Optional[str]: """ return self.get().text - def set_text(self, text: str = ""): + def set_text(self, text: str): el = self.get() - self._d.set_fastinput_ime() # switch ime el.click() # focus input-area - self._parent.send_text(text) + self._parent._d.send_keys(text) def wait(self, timeout=None) -> bool: """ wait until element found """ @@ -529,7 +483,7 @@ def wait_gone(self, timeout=None) -> bool: def click_nowait(self): x, y = self.all()[0].center() logger.info("click %d, %d", x, y) - self._parent.send_click(x, y) + self._parent._d.click(x, y) def click(self, timeout=None): """find element and perform click""" @@ -559,7 +513,7 @@ def screenshot(self) -> Image.Image: """take element screenshot""" el = self.get() return el.screenshot() - + def __getattr__(self, key: str): """ In IPython console, attr:_ipython_canary_method_should_not_exist_ will be called @@ -567,12 +521,14 @@ def __getattr__(self, key: str): """ if key.startswith("_"): raise AttributeError("Invalid attr", key) + if not hasattr(XMLElement, key): + raise AttributeError("Invalid attr", key) el = self.get() return getattr(el, key) class XMLElement(object): - def __init__(self, elem, parent: XPath): + def __init__(self, elem: etree._Element, parent: XPathEntry = None): """ Args: elem: lxml node @@ -580,23 +536,16 @@ def __init__(self, elem, parent: XPath): """ self.elem = elem self._parent = parent - self._d = parent._d def __hash__(self): - compared_attrs = ("text", "resource-id", "package", "content-desc") - values = [self.attrib.get(name) for name in compared_attrs] - root = self.elem.getroottree() - fullpath = root.getpath(self.elem) - fullpath = re.sub(r"\[\d+\]", "", fullpath) # remove indexes - values.append(fullpath) - return hash(tuple(values)) + return hash(self.elem) def __eq__(self, value): return self.__hash__() == hash(value) def __repr__(self): x, y = self.center() - return "".format( + return "".format( tag=self.elem.tag, x=x, y=y ) @@ -634,20 +583,20 @@ def click(self): click element, 100ms between down and up """ x, y = self.center() - self._parent.send_click(x, y) + self._parent._d.click(x, y) def long_click(self): """ Sometime long click is needed, 400ms between down and up """ x, y = self.center() - self._parent.send_longclick(x, y) + self._parent._d.long_click(x, y) def screenshot(self): """ Take screenshot of element """ - im = self._parent.take_screenshot() + im = self._parent._d.screenshot() return im.crop(self.bounds) def swipe(self, direction: Union[Direction, str], scale: float = 0.6): @@ -724,17 +673,19 @@ def percent_size(self): """Returns: (float, float): eg, (0.5, 0.5) means 50%, 50% """ - ww, wh = self._d.window_size() + ww, wh = self._parent._d.window_size() _, _, w, h = self.rect return (w / ww, h / wh) - @property - def bounds(self): + @functools.cached_property + def bounds(self) -> Tuple[int, int, int, int]: """ Returns: tuple of (left, top, right, bottom) """ bounds = self.elem.attrib.get("bounds") + if not bounds: + return (0, 0, 0, 0) lx, ly, rx, ry = map(int, re.findall(r"\d+", bounds)) return (lx, ly, rx, ry) @@ -747,11 +698,11 @@ def percent_bounds(self, wsize: Optional[tuple] = None): list of 4 float, eg: 0.1, 0.2, 0.5, 0.8 """ lx, ly, rx, ry = self.bounds - ww, wh = wsize or self._d.window_size() + ww, wh = wsize or self._parent._d.window_size() return (lx / ww, ly / wh, rx / ww, ry / wh) @property - def rect(self): + def rect(self) -> Tuple[int, int, int, int]: """ Returns: (left_top_x, left_top_y, width, height)